async_read_buffer.h
Go to the documentation of this file.
00001 
00034 #ifndef ROSSERIAL_SERVER_ASYNC_READ_BUFFER_H
00035 #define ROSSERIAL_SERVER_ASYNC_READ_BUFFER_H
00036 
00037 #include <boost/bind.hpp>
00038 #include <boost/asio.hpp>
00039 #include <boost/function.hpp>
00040 
00041 #include <ros/ros.h>
00042 
00043 namespace rosserial_server
00044 {
00045 
00046 template<typename AsyncReadStream>
00047 class AsyncReadBuffer
00048 {
00049 public:
00050   AsyncReadBuffer(AsyncReadStream& s, size_t capacity,
00051                   boost::function<void(const boost::system::error_code&)> error_callback)
00052        : stream_(s), read_requested_bytes_(0), error_callback_(error_callback) {
00053     reset();
00054     mem_.resize(capacity);
00055     ROS_ASSERT_MSG(error_callback_, "Bad error callback passed to read buffer.");
00056   }
00057 
00062   void read(size_t requested_bytes, boost::function<void(ros::serialization::IStream&)> callback) {
00063     ROS_DEBUG_STREAM_NAMED("async_read", "Buffer read of " << requested_bytes << " bytes, " <<
00064                            "wi: " << write_index_ << ", ri: " << read_index_);
00065 
00066     ROS_ASSERT_MSG(read_requested_bytes_ == 0, "Bytes requested is nonzero, is there an operation already pending?");
00067     ROS_ASSERT_MSG(callback, "Bad read success callback function.");
00068     read_success_callback_ = callback;
00069     read_requested_bytes_ = requested_bytes;
00070 
00071     if (read_requested_bytes_ > mem_.size())
00072     {
00073       // Insufficient room in the buffer for the requested bytes,
00074       ROS_ERROR_STREAM_NAMED("async_read", "Requested to read " << read_requested_bytes_ <<
00075                              " bytes, but buffer capacity is only " << mem_.size() << ".");
00076       error_callback_(boost::system::errc::make_error_code(boost::system::errc::no_buffer_space));
00077       return;
00078     }
00079 
00080     // Number of bytes which must be transferred to satisfy the request.
00081     ssize_t transfer_bytes = read_requested_bytes_ - bytesAvailable();
00082 
00083     if (transfer_bytes > 0)
00084     {
00085       // If we don't have enough headroom in the buffer, we'll have to shift what's currently in there to make room.
00086       if (bytesHeadroom() < transfer_bytes)
00087       {
00088         memmove(&mem_[0], &mem_[read_index_], bytesAvailable());
00089         write_index_ = bytesAvailable();
00090         read_index_ = 0;
00091       }
00092 
00093       // Initiate a read from hardware so that we have enough bytes to fill the user request.
00094       ROS_DEBUG_STREAM_NAMED("async_read", "Requesting transfer of at least " << transfer_bytes << " byte(s).");
00095       boost::asio::async_read(stream_,
00096           boost::asio::buffer(&mem_[write_index_], bytesHeadroom()),
00097           boost::asio::transfer_at_least(transfer_bytes),
00098           boost::bind(&AsyncReadBuffer::callback, this,
00099                       boost::asio::placeholders::error,
00100                       boost::asio::placeholders::bytes_transferred));
00101     }
00102     else
00103     {
00104       // We have enough in the buffer already, can fill the request without going to hardware.
00105       callSuccessCallback();
00106     }
00107   }
00108 
00109 private:
00110   void reset()
00111   {
00112     read_index_ = 0;
00113     write_index_ = 0;
00114   }
00115 
00116   inline size_t bytesAvailable()
00117   {
00118     return write_index_ - read_index_;
00119   }
00120 
00121   inline size_t bytesHeadroom()
00122   {
00123     return mem_.size() - write_index_;
00124   }
00125 
00130   void callback(const boost::system::error_code& error, size_t bytes_transferred)
00131   {
00132     if (error)
00133     {
00134       read_requested_bytes_ = 0;
00135       read_success_callback_.clear();
00136       ROS_DEBUG_STREAM_NAMED("async_read", "Read operation failed with: " << error);
00137 
00138       if (error == boost::asio::error::operation_aborted)
00139       {
00140         // Special case for operation_aborted. The abort callback comes when the owning Session
00141         // is in the middle of teardown, which means the callback is no longer valid.
00142       }
00143       else
00144       {
00145         error_callback_(error);
00146       }
00147       return;
00148     }
00149 
00150     write_index_ += bytes_transferred;
00151     ROS_DEBUG_STREAM_NAMED("async_read", "Successfully read " << bytes_transferred << " byte(s), now " << bytesAvailable() << " available.");
00152     callSuccessCallback();
00153   }
00154 
00159   void callSuccessCallback()
00160   {
00161     ROS_DEBUG_STREAM_NAMED("async_read", "Invoking success callback with buffer of requested size " <<
00162                            read_requested_bytes_ << " byte(s).");
00163 
00164     ros::serialization::IStream stream(&mem_[read_index_], read_requested_bytes_);
00165     read_index_ += read_requested_bytes_;
00166 
00167     // Post the callback rather than executing it here so, so that we have a chance to do the cleanup
00168     // below prior to it actually getting run, in the event that the callback queues up another read.
00169     stream_.get_io_service().post(boost::bind(read_success_callback_, stream));
00170 
00171     // Resetting these values clears our state so that we know there isn't a callback pending.
00172     read_requested_bytes_ = 0;
00173     read_success_callback_.clear();
00174 
00175     if (bytesAvailable() == 0)
00176     {
00177       ROS_DEBUG_STREAM_NAMED("async_read", "Buffer is empty, resetting indexes to the beginning.");
00178       reset();
00179     }
00180   }
00181 
00182   AsyncReadStream& stream_;
00183   std::vector<uint8_t> mem_;
00184 
00185   size_t write_index_;
00186   size_t read_index_;
00187   boost::function<void(const boost::system::error_code&)> error_callback_;
00188 
00189   boost::function<void(ros::serialization::IStream&)> read_success_callback_;
00190   size_t read_requested_bytes_;
00191 };
00192 
00193 }  // namespace
00194 
00195 #endif  // ROSSERIAL_SERVER_ASYNC_READ_BUFFER_H


rosserial_server
Author(s): Mike Purvis
autogenerated on Sat Oct 7 2017 03:08:51