Go to the documentation of this file.00001
00034 #ifndef ROSSERIAL_SERVER_ASYNC_READ_BUFFER_H
00035 #define ROSSERIAL_SERVER_ASYNC_READ_BUFFER_H
00036
00037 #include <boost/bind.hpp>
00038 #include <boost/asio.hpp>
00039 #include <boost/function.hpp>
00040
00041 #include <ros/ros.h>
00042
00043 namespace rosserial_server
00044 {
00045
00046 template<typename AsyncReadStream>
00047 class AsyncReadBuffer
00048 {
00049 public:
00050 AsyncReadBuffer(AsyncReadStream& s, size_t capacity,
00051 boost::function<void(const boost::system::error_code&)> error_callback)
00052 : stream_(s), read_requested_bytes_(0), error_callback_(error_callback) {
00053 reset();
00054 mem_.resize(capacity);
00055 ROS_ASSERT_MSG(error_callback_, "Bad error callback passed to read buffer.");
00056 }
00057
00062 void read(size_t requested_bytes, boost::function<void(ros::serialization::IStream&)> callback) {
00063 ROS_DEBUG_STREAM_NAMED("async_read", "Buffer read of " << requested_bytes << " bytes, " <<
00064 "wi: " << write_index_ << ", ri: " << read_index_);
00065
00066 ROS_ASSERT_MSG(read_requested_bytes_ == 0, "Bytes requested is nonzero, is there an operation already pending?");
00067 ROS_ASSERT_MSG(callback, "Bad read success callback function.");
00068 read_success_callback_ = callback;
00069 read_requested_bytes_ = requested_bytes;
00070
00071 if (read_requested_bytes_ > mem_.size())
00072 {
00073
00074 ROS_ERROR_STREAM_NAMED("async_read", "Requested to read " << read_requested_bytes_ <<
00075 " bytes, but buffer capacity is only " << mem_.size() << ".");
00076 error_callback_(boost::system::errc::make_error_code(boost::system::errc::no_buffer_space));
00077 return;
00078 }
00079
00080
00081 ssize_t transfer_bytes = read_requested_bytes_ - bytesAvailable();
00082
00083 if (transfer_bytes > 0)
00084 {
00085
00086 if (bytesHeadroom() < transfer_bytes)
00087 {
00088 memmove(&mem_[0], &mem_[read_index_], bytesAvailable());
00089 write_index_ = bytesAvailable();
00090 read_index_ = 0;
00091 }
00092
00093
00094 ROS_DEBUG_STREAM_NAMED("async_read", "Requesting transfer of at least " << transfer_bytes << " byte(s).");
00095 boost::asio::async_read(stream_,
00096 boost::asio::buffer(&mem_[write_index_], bytesHeadroom()),
00097 boost::asio::transfer_at_least(transfer_bytes),
00098 boost::bind(&AsyncReadBuffer::callback, this,
00099 boost::asio::placeholders::error,
00100 boost::asio::placeholders::bytes_transferred));
00101 }
00102 else
00103 {
00104
00105 callSuccessCallback();
00106 }
00107 }
00108
00109 private:
00110 void reset()
00111 {
00112 read_index_ = 0;
00113 write_index_ = 0;
00114 }
00115
00116 inline size_t bytesAvailable()
00117 {
00118 return write_index_ - read_index_;
00119 }
00120
00121 inline size_t bytesHeadroom()
00122 {
00123 return mem_.size() - write_index_;
00124 }
00125
00130 void callback(const boost::system::error_code& error, size_t bytes_transferred)
00131 {
00132 if (error)
00133 {
00134 read_requested_bytes_ = 0;
00135 read_success_callback_.clear();
00136 ROS_DEBUG_STREAM_NAMED("async_read", "Read operation failed with: " << error);
00137
00138 if (error == boost::asio::error::operation_aborted)
00139 {
00140
00141
00142 }
00143 else
00144 {
00145 error_callback_(error);
00146 }
00147 return;
00148 }
00149
00150 write_index_ += bytes_transferred;
00151 ROS_DEBUG_STREAM_NAMED("async_read", "Successfully read " << bytes_transferred << " byte(s), now " << bytesAvailable() << " available.");
00152 callSuccessCallback();
00153 }
00154
00159 void callSuccessCallback()
00160 {
00161 ROS_DEBUG_STREAM_NAMED("async_read", "Invoking success callback with buffer of requested size " <<
00162 read_requested_bytes_ << " byte(s).");
00163
00164 ros::serialization::IStream stream(&mem_[read_index_], read_requested_bytes_);
00165 read_index_ += read_requested_bytes_;
00166
00167
00168
00169 stream_.get_io_service().post(boost::bind(read_success_callback_, stream));
00170
00171
00172 read_requested_bytes_ = 0;
00173 read_success_callback_.clear();
00174
00175 if (bytesAvailable() == 0)
00176 {
00177 ROS_DEBUG_STREAM_NAMED("async_read", "Buffer is empty, resetting indexes to the beginning.");
00178 reset();
00179 }
00180 }
00181
00182 AsyncReadStream& stream_;
00183 std::vector<uint8_t> mem_;
00184
00185 size_t write_index_;
00186 size_t read_index_;
00187 boost::function<void(const boost::system::error_code&)> error_callback_;
00188
00189 boost::function<void(ros::serialization::IStream&)> read_success_callback_;
00190 size_t read_requested_bytes_;
00191 };
00192
00193 }
00194
00195 #endif // ROSSERIAL_SERVER_ASYNC_READ_BUFFER_H