reactive_descriptor_service.hpp
Go to the documentation of this file.
00001 //
00002 // reactive_descriptor_service.hpp
00003 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
00004 //
00005 // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
00006 //
00007 // Distributed under the Boost Software License, Version 1.0. (See accompanying
00008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
00009 //
00010 
00011 #ifndef ASIO_DETAIL_REACTIVE_DESCRIPTOR_SERVICE_HPP
00012 #define ASIO_DETAIL_REACTIVE_DESCRIPTOR_SERVICE_HPP
00013 
00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00015 # pragma once
00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00017 
00018 #include "asio/detail/push_options.hpp"
00019 
00020 #include "asio/buffer.hpp"
00021 #include "asio/error.hpp"
00022 #include "asio/io_service.hpp"
00023 #include "asio/detail/bind_handler.hpp"
00024 #include "asio/detail/handler_base_from_member.hpp"
00025 #include "asio/detail/noncopyable.hpp"
00026 #include "asio/detail/service_base.hpp"
00027 #include "asio/detail/descriptor_ops.hpp"
00028 
00029 #if !defined(BOOST_WINDOWS) && !defined(__CYGWIN__)
00030 
00031 namespace asio {
00032 namespace detail {
00033 
00034 template <typename Reactor>
00035 class reactive_descriptor_service
00036   : public asio::detail::service_base<
00037       reactive_descriptor_service<Reactor> >
00038 {
00039 public:
00040   // The native type of a descriptor.
00041   typedef int native_type;
00042 
00043   // The implementation type of the descriptor.
00044   class implementation_type
00045     : private asio::detail::noncopyable
00046   {
00047   public:
00048     // Default constructor.
00049     implementation_type()
00050       : descriptor_(-1),
00051         flags_(0)
00052     {
00053     }
00054 
00055   private:
00056     // Only this service will have access to the internal values.
00057     friend class reactive_descriptor_service<Reactor>;
00058 
00059     // The native descriptor representation.
00060     int descriptor_;
00061 
00062     enum
00063     {
00064       user_set_non_blocking = 1, // The user wants a non-blocking descriptor.
00065       internal_non_blocking = 2  // The descriptor has been set non-blocking.
00066     };
00067 
00068     // Flags indicating the current state of the descriptor.
00069     unsigned char flags_;
00070 
00071     // Per-descriptor data used by the reactor.
00072     typename Reactor::per_descriptor_data reactor_data_;
00073   };
00074 
00075   // The maximum number of buffers to support in a single operation.
00076   enum { max_buffers = 64 < max_iov_len ? 64 : max_iov_len };
00077 
00078   // Constructor.
00079   reactive_descriptor_service(asio::io_service& io_service)
00080     : asio::detail::service_base<
00081         reactive_descriptor_service<Reactor> >(io_service),
00082       reactor_(asio::use_service<Reactor>(io_service))
00083   {
00084   }
00085 
00086   // Destroy all user-defined handler objects owned by the service.
00087   void shutdown_service()
00088   {
00089   }
00090 
00091   // Construct a new descriptor implementation.
00092   void construct(implementation_type& impl)
00093   {
00094     impl.descriptor_ = -1;
00095     impl.flags_ = 0;
00096   }
00097 
00098   // Destroy a descriptor implementation.
00099   void destroy(implementation_type& impl)
00100   {
00101     if (impl.descriptor_ != -1)
00102     {
00103       reactor_.close_descriptor(impl.descriptor_, impl.reactor_data_);
00104 
00105       if (impl.flags_ & implementation_type::internal_non_blocking)
00106       {
00107         ioctl_arg_type non_blocking = 0;
00108         asio::error_code ignored_ec;
00109         descriptor_ops::ioctl(impl.descriptor_,
00110             FIONBIO, &non_blocking, ignored_ec);
00111         impl.flags_ &= ~implementation_type::internal_non_blocking;
00112       }
00113 
00114       asio::error_code ignored_ec;
00115       descriptor_ops::close(impl.descriptor_, ignored_ec);
00116 
00117       impl.descriptor_ = -1;
00118     }
00119   }
00120 
00121   // Assign a native descriptor to a descriptor implementation.
00122   asio::error_code assign(implementation_type& impl,
00123       const native_type& native_descriptor, asio::error_code& ec)
00124   {
00125     if (is_open(impl))
00126     {
00127       ec = asio::error::already_open;
00128       return ec;
00129     }
00130 
00131     if (int err = reactor_.register_descriptor(
00132           native_descriptor, impl.reactor_data_))
00133     {
00134       ec = asio::error_code(err,
00135           asio::error::get_system_category());
00136       return ec;
00137     }
00138 
00139     impl.descriptor_ = native_descriptor;
00140     impl.flags_ = 0;
00141     ec = asio::error_code();
00142     return ec;
00143   }
00144 
00145   // Determine whether the descriptor is open.
00146   bool is_open(const implementation_type& impl) const
00147   {
00148     return impl.descriptor_ != -1;
00149   }
00150 
00151   // Destroy a descriptor implementation.
00152   asio::error_code close(implementation_type& impl,
00153       asio::error_code& ec)
00154   {
00155     if (is_open(impl))
00156     {
00157       reactor_.close_descriptor(impl.descriptor_, impl.reactor_data_);
00158 
00159       if (impl.flags_ & implementation_type::internal_non_blocking)
00160       {
00161         ioctl_arg_type non_blocking = 0;
00162         asio::error_code ignored_ec;
00163         descriptor_ops::ioctl(impl.descriptor_,
00164             FIONBIO, &non_blocking, ignored_ec);
00165         impl.flags_ &= ~implementation_type::internal_non_blocking;
00166       }
00167 
00168       if (descriptor_ops::close(impl.descriptor_, ec) == -1)
00169         return ec;
00170 
00171       impl.descriptor_ = -1;
00172     }
00173 
00174     ec = asio::error_code();
00175     return ec;
00176   }
00177 
00178   // Get the native descriptor representation.
00179   native_type native(const implementation_type& impl) const
00180   {
00181     return impl.descriptor_;
00182   }
00183 
00184   // Cancel all operations associated with the descriptor.
00185   asio::error_code cancel(implementation_type& impl,
00186       asio::error_code& ec)
00187   {
00188     if (!is_open(impl))
00189     {
00190       ec = asio::error::bad_descriptor;
00191       return ec;
00192     }
00193 
00194     reactor_.cancel_ops(impl.descriptor_, impl.reactor_data_);
00195     ec = asio::error_code();
00196     return ec;
00197   }
00198 
00199   // Perform an IO control command on the descriptor.
00200   template <typename IO_Control_Command>
00201   asio::error_code io_control(implementation_type& impl,
00202       IO_Control_Command& command, asio::error_code& ec)
00203   {
00204     if (!is_open(impl))
00205     {
00206       ec = asio::error::bad_descriptor;
00207       return ec;
00208     }
00209 
00210     if (command.name() == static_cast<int>(FIONBIO))
00211     {
00212       if (command.get())
00213         impl.flags_ |= implementation_type::user_set_non_blocking;
00214       else
00215         impl.flags_ &= ~implementation_type::user_set_non_blocking;
00216       ec = asio::error_code();
00217     }
00218     else
00219     {
00220       descriptor_ops::ioctl(impl.descriptor_, command.name(),
00221           static_cast<ioctl_arg_type*>(command.data()), ec);
00222     }
00223     return ec;
00224   }
00225 
00226   // Write some data to the descriptor.
00227   template <typename ConstBufferSequence>
00228   size_t write_some(implementation_type& impl,
00229       const ConstBufferSequence& buffers, asio::error_code& ec)
00230   {
00231     if (!is_open(impl))
00232     {
00233       ec = asio::error::bad_descriptor;
00234       return 0;
00235     }
00236 
00237     // Copy buffers into array.
00238     descriptor_ops::buf bufs[max_buffers];
00239     typename ConstBufferSequence::const_iterator iter = buffers.begin();
00240     typename ConstBufferSequence::const_iterator end = buffers.end();
00241     size_t i = 0;
00242     size_t total_buffer_size = 0;
00243     for (; iter != end && i < max_buffers; ++iter, ++i)
00244     {
00245       asio::const_buffer buffer(*iter);
00246       descriptor_ops::init_buf(bufs[i],
00247           asio::buffer_cast<const void*>(buffer),
00248           asio::buffer_size(buffer));
00249       total_buffer_size += asio::buffer_size(buffer);
00250     }
00251 
00252     // A request to read_some 0 bytes on a stream is a no-op.
00253     if (total_buffer_size == 0)
00254     {
00255       ec = asio::error_code();
00256       return 0;
00257     }
00258 
00259     // Make descriptor non-blocking if user wants non-blocking.
00260     if (impl.flags_ & implementation_type::user_set_non_blocking)
00261     {
00262       if (!(impl.flags_ & implementation_type::internal_non_blocking))
00263       {
00264         ioctl_arg_type non_blocking = 1;
00265         if (descriptor_ops::ioctl(impl.descriptor_,
00266               FIONBIO, &non_blocking, ec))
00267           return 0;
00268         impl.flags_ |= implementation_type::internal_non_blocking;
00269       }
00270     }
00271 
00272     // Send the data.
00273     for (;;)
00274     {
00275       // Try to complete the operation without blocking.
00276       int bytes_sent = descriptor_ops::gather_write(
00277           impl.descriptor_, bufs, i, ec);
00278 
00279       // Check if operation succeeded.
00280       if (bytes_sent >= 0)
00281         return bytes_sent;
00282 
00283       // Operation failed.
00284       if ((impl.flags_ & implementation_type::user_set_non_blocking)
00285           || (ec != asio::error::would_block
00286             && ec != asio::error::try_again))
00287         return 0;
00288 
00289       // Wait for descriptor to become ready.
00290       if (descriptor_ops::poll_write(impl.descriptor_, ec) < 0)
00291         return 0;
00292     }
00293   }
00294 
00295   // Wait until data can be written without blocking.
00296   size_t write_some(implementation_type& impl,
00297       const null_buffers&, asio::error_code& ec)
00298   {
00299     if (!is_open(impl))
00300     {
00301       ec = asio::error::bad_descriptor;
00302       return 0;
00303     }
00304 
00305     // Wait for descriptor to become ready.
00306     descriptor_ops::poll_write(impl.descriptor_, ec);
00307 
00308     return 0;
00309   }
00310 
00311   template <typename ConstBufferSequence, typename Handler>
00312   class write_operation :
00313     public handler_base_from_member<Handler>
00314   {
00315   public:
00316     write_operation(int descriptor, asio::io_service& io_service,
00317         const ConstBufferSequence& buffers, Handler handler)
00318       : handler_base_from_member<Handler>(handler),
00319         descriptor_(descriptor),
00320         io_service_(io_service),
00321         work_(io_service),
00322         buffers_(buffers)
00323     {
00324     }
00325 
00326     bool perform(asio::error_code& ec,
00327         std::size_t& bytes_transferred)
00328     {
00329       // Check whether the operation was successful.
00330       if (ec)
00331       {
00332         bytes_transferred = 0;
00333         return true;
00334       }
00335 
00336       // Copy buffers into array.
00337       descriptor_ops::buf bufs[max_buffers];
00338       typename ConstBufferSequence::const_iterator iter = buffers_.begin();
00339       typename ConstBufferSequence::const_iterator end = buffers_.end();
00340       size_t i = 0;
00341       for (; iter != end && i < max_buffers; ++iter, ++i)
00342       {
00343         asio::const_buffer buffer(*iter);
00344         descriptor_ops::init_buf(bufs[i],
00345             asio::buffer_cast<const void*>(buffer),
00346             asio::buffer_size(buffer));
00347       }
00348 
00349       // Write the data.
00350       int bytes = descriptor_ops::gather_write(descriptor_, bufs, i, ec);
00351 
00352       // Check if we need to run the operation again.
00353       if (ec == asio::error::would_block
00354           || ec == asio::error::try_again)
00355         return false;
00356 
00357       bytes_transferred = (bytes < 0 ? 0 : bytes);
00358       return true;
00359     }
00360 
00361     void complete(const asio::error_code& ec,
00362         std::size_t bytes_transferred)
00363     {
00364       io_service_.post(bind_handler(this->handler_, ec, bytes_transferred));
00365     }
00366 
00367   private:
00368     int descriptor_;
00369     asio::io_service& io_service_;
00370     asio::io_service::work work_;
00371     ConstBufferSequence buffers_;
00372   };
00373 
00374   // Start an asynchronous write. The data being sent must be valid for the
00375   // lifetime of the asynchronous operation.
00376   template <typename ConstBufferSequence, typename Handler>
00377   void async_write_some(implementation_type& impl,
00378       const ConstBufferSequence& buffers, Handler handler)
00379   {
00380     if (!is_open(impl))
00381     {
00382       this->get_io_service().post(bind_handler(handler,
00383             asio::error::bad_descriptor, 0));
00384     }
00385     else
00386     {
00387       // Determine total size of buffers.
00388       typename ConstBufferSequence::const_iterator iter = buffers.begin();
00389       typename ConstBufferSequence::const_iterator end = buffers.end();
00390       size_t i = 0;
00391       size_t total_buffer_size = 0;
00392       for (; iter != end && i < max_buffers; ++iter, ++i)
00393       {
00394         asio::const_buffer buffer(*iter);
00395         total_buffer_size += asio::buffer_size(buffer);
00396       }
00397 
00398       // A request to read_some 0 bytes on a stream is a no-op.
00399       if (total_buffer_size == 0)
00400       {
00401         this->get_io_service().post(bind_handler(handler,
00402               asio::error_code(), 0));
00403         return;
00404       }
00405 
00406       // Make descriptor non-blocking.
00407       if (!(impl.flags_ & implementation_type::internal_non_blocking))
00408       {
00409         ioctl_arg_type non_blocking = 1;
00410         asio::error_code ec;
00411         if (descriptor_ops::ioctl(impl.descriptor_, FIONBIO, &non_blocking, ec))
00412         {
00413           this->get_io_service().post(bind_handler(handler, ec, 0));
00414           return;
00415         }
00416         impl.flags_ |= implementation_type::internal_non_blocking;
00417       }
00418 
00419       reactor_.start_write_op(impl.descriptor_, impl.reactor_data_,
00420           write_operation<ConstBufferSequence, Handler>(
00421             impl.descriptor_, this->get_io_service(), buffers, handler));
00422     }
00423   }
00424 
00425   template <typename Handler>
00426   class null_buffers_operation :
00427     public handler_base_from_member<Handler>
00428   {
00429   public:
00430     null_buffers_operation(asio::io_service& io_service, Handler handler)
00431       : handler_base_from_member<Handler>(handler),
00432         work_(io_service)
00433     {
00434     }
00435 
00436     bool perform(asio::error_code&,
00437         std::size_t& bytes_transferred)
00438     {
00439       bytes_transferred = 0;
00440       return true;
00441     }
00442 
00443     void complete(const asio::error_code& ec,
00444         std::size_t bytes_transferred)
00445     {
00446       work_.get_io_service().post(bind_handler(
00447             this->handler_, ec, bytes_transferred));
00448     }
00449 
00450   private:
00451     asio::io_service::work work_;
00452   };
00453 
00454   // Start an asynchronous wait until data can be written without blocking.
00455   template <typename Handler>
00456   void async_write_some(implementation_type& impl,
00457       const null_buffers&, Handler handler)
00458   {
00459     if (!is_open(impl))
00460     {
00461       this->get_io_service().post(bind_handler(handler,
00462             asio::error::bad_descriptor, 0));
00463     }
00464     else
00465     {
00466       reactor_.start_write_op(impl.descriptor_, impl.reactor_data_,
00467           null_buffers_operation<Handler>(this->get_io_service(), handler),
00468           false);
00469     }
00470   }
00471 
00472   // Read some data from the stream. Returns the number of bytes read.
00473   template <typename MutableBufferSequence>
00474   size_t read_some(implementation_type& impl,
00475       const MutableBufferSequence& buffers, asio::error_code& ec)
00476   {
00477     if (!is_open(impl))
00478     {
00479       ec = asio::error::bad_descriptor;
00480       return 0;
00481     }
00482 
00483     // Copy buffers into array.
00484     descriptor_ops::buf bufs[max_buffers];
00485     typename MutableBufferSequence::const_iterator iter = buffers.begin();
00486     typename MutableBufferSequence::const_iterator end = buffers.end();
00487     size_t i = 0;
00488     size_t total_buffer_size = 0;
00489     for (; iter != end && i < max_buffers; ++iter, ++i)
00490     {
00491       asio::mutable_buffer buffer(*iter);
00492       descriptor_ops::init_buf(bufs[i],
00493           asio::buffer_cast<void*>(buffer),
00494           asio::buffer_size(buffer));
00495       total_buffer_size += asio::buffer_size(buffer);
00496     }
00497 
00498     // A request to read_some 0 bytes on a stream is a no-op.
00499     if (total_buffer_size == 0)
00500     {
00501       ec = asio::error_code();
00502       return 0;
00503     }
00504 
00505     // Make descriptor non-blocking if user wants non-blocking.
00506     if (impl.flags_ & implementation_type::user_set_non_blocking)
00507     {
00508       if (!(impl.flags_ & implementation_type::internal_non_blocking))
00509       {
00510         ioctl_arg_type non_blocking = 1;
00511         if (descriptor_ops::ioctl(impl.descriptor_, FIONBIO, &non_blocking, ec))
00512           return 0;
00513         impl.flags_ |= implementation_type::internal_non_blocking;
00514       }
00515     }
00516 
00517     // Read some data.
00518     for (;;)
00519     {
00520       // Try to complete the operation without blocking.
00521       int bytes_read = descriptor_ops::scatter_read(
00522           impl.descriptor_, bufs, i, ec);
00523 
00524       // Check if operation succeeded.
00525       if (bytes_read > 0)
00526         return bytes_read;
00527 
00528       // Check for EOF.
00529       if (bytes_read == 0)
00530       {
00531         ec = asio::error::eof;
00532         return 0;
00533       }
00534 
00535       // Operation failed.
00536       if ((impl.flags_ & implementation_type::user_set_non_blocking)
00537           || (ec != asio::error::would_block
00538             && ec != asio::error::try_again))
00539         return 0;
00540 
00541       // Wait for descriptor to become ready.
00542       if (descriptor_ops::poll_read(impl.descriptor_, ec) < 0)
00543         return 0;
00544     }
00545   }
00546 
00547   // Wait until data can be read without blocking.
00548   size_t read_some(implementation_type& impl,
00549       const null_buffers&, asio::error_code& ec)
00550   {
00551     if (!is_open(impl))
00552     {
00553       ec = asio::error::bad_descriptor;
00554       return 0;
00555     }
00556 
00557     // Wait for descriptor to become ready.
00558     descriptor_ops::poll_read(impl.descriptor_, ec);
00559 
00560     return 0;
00561   }
00562 
00563   template <typename MutableBufferSequence, typename Handler>
00564   class read_operation :
00565     public handler_base_from_member<Handler>
00566   {
00567   public:
00568     read_operation(int descriptor, asio::io_service& io_service,
00569         const MutableBufferSequence& buffers, Handler handler)
00570       : handler_base_from_member<Handler>(handler),
00571         descriptor_(descriptor),
00572         io_service_(io_service),
00573         work_(io_service),
00574         buffers_(buffers)
00575     {
00576     }
00577 
00578     bool perform(asio::error_code& ec,
00579         std::size_t& bytes_transferred)
00580     {
00581       // Check whether the operation was successful.
00582       if (ec)
00583       {
00584         bytes_transferred = 0;
00585         return true;
00586       }
00587 
00588       // Copy buffers into array.
00589       descriptor_ops::buf bufs[max_buffers];
00590       typename MutableBufferSequence::const_iterator iter = buffers_.begin();
00591       typename MutableBufferSequence::const_iterator end = buffers_.end();
00592       size_t i = 0;
00593       for (; iter != end && i < max_buffers; ++iter, ++i)
00594       {
00595         asio::mutable_buffer buffer(*iter);
00596         descriptor_ops::init_buf(bufs[i],
00597             asio::buffer_cast<void*>(buffer),
00598             asio::buffer_size(buffer));
00599       }
00600 
00601       // Read some data.
00602       int bytes = descriptor_ops::scatter_read(descriptor_, bufs, i, ec);
00603       if (bytes == 0)
00604         ec = asio::error::eof;
00605 
00606       // Check if we need to run the operation again.
00607       if (ec == asio::error::would_block
00608           || ec == asio::error::try_again)
00609         return false;
00610 
00611       bytes_transferred = (bytes < 0 ? 0 : bytes);
00612       return true;
00613     }
00614 
00615     void complete(const asio::error_code& ec,
00616         std::size_t bytes_transferred)
00617     {
00618       io_service_.post(bind_handler(this->handler_, ec, bytes_transferred));
00619     }
00620 
00621   private:
00622     int descriptor_;
00623     asio::io_service& io_service_;
00624     asio::io_service::work work_;
00625     MutableBufferSequence buffers_;
00626   };
00627 
00628   // Start an asynchronous read. The buffer for the data being read must be
00629   // valid for the lifetime of the asynchronous operation.
00630   template <typename MutableBufferSequence, typename Handler>
00631   void async_read_some(implementation_type& impl,
00632       const MutableBufferSequence& buffers, Handler handler)
00633   {
00634     if (!is_open(impl))
00635     {
00636       this->get_io_service().post(bind_handler(handler,
00637             asio::error::bad_descriptor, 0));
00638     }
00639     else
00640     {
00641       // Determine total size of buffers.
00642       typename MutableBufferSequence::const_iterator iter = buffers.begin();
00643       typename MutableBufferSequence::const_iterator end = buffers.end();
00644       size_t i = 0;
00645       size_t total_buffer_size = 0;
00646       for (; iter != end && i < max_buffers; ++iter, ++i)
00647       {
00648         asio::mutable_buffer buffer(*iter);
00649         total_buffer_size += asio::buffer_size(buffer);
00650       }
00651 
00652       // A request to read_some 0 bytes on a stream is a no-op.
00653       if (total_buffer_size == 0)
00654       {
00655         this->get_io_service().post(bind_handler(handler,
00656               asio::error_code(), 0));
00657         return;
00658       }
00659 
00660       // Make descriptor non-blocking.
00661       if (!(impl.flags_ & implementation_type::internal_non_blocking))
00662       {
00663         ioctl_arg_type non_blocking = 1;
00664         asio::error_code ec;
00665         if (descriptor_ops::ioctl(impl.descriptor_, FIONBIO, &non_blocking, ec))
00666         {
00667           this->get_io_service().post(bind_handler(handler, ec, 0));
00668           return;
00669         }
00670         impl.flags_ |= implementation_type::internal_non_blocking;
00671       }
00672 
00673       reactor_.start_read_op(impl.descriptor_, impl.reactor_data_,
00674           read_operation<MutableBufferSequence, Handler>(
00675             impl.descriptor_, this->get_io_service(), buffers, handler));
00676     }
00677   }
00678 
00679   // Wait until data can be read without blocking.
00680   template <typename Handler>
00681   void async_read_some(implementation_type& impl,
00682       const null_buffers&, Handler handler)
00683   {
00684     if (!is_open(impl))
00685     {
00686       this->get_io_service().post(bind_handler(handler,
00687             asio::error::bad_descriptor, 0));
00688     }
00689     else
00690     {
00691       reactor_.start_read_op(impl.descriptor_, impl.reactor_data_,
00692           null_buffers_operation<Handler>(this->get_io_service(), handler),
00693           false);
00694     }
00695   }
00696 
00697 private:
00698   // The selector that performs event demultiplexing for the service.
00699   Reactor& reactor_;
00700 };
00701 
00702 } // namespace detail
00703 } // namespace asio
00704 
00705 #endif // !defined(BOOST_WINDOWS) && !defined(__CYGWIN__)
00706 
00707 #include "asio/detail/pop_options.hpp"
00708 
00709 #endif // ASIO_DETAIL_REACTIVE_DESCRIPTOR_SERVICE_HPP
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines


Castor
Author(s): Carpe Noctem
autogenerated on Fri Nov 8 2013 11:05:39