async_read_buffer.h
Go to the documentation of this file.
1 
34 #ifndef ROSSERIAL_SERVER_ASYNC_READ_BUFFER_H
35 #define ROSSERIAL_SERVER_ASYNC_READ_BUFFER_H
36 
37 #include <boost/bind.hpp>
38 #include <boost/asio.hpp>
39 #include <boost/function.hpp>
40 
41 #include <ros/ros.h>
42 
44 {
45 
46 template<typename AsyncReadStream>
48 {
49 public:
50  AsyncReadBuffer(AsyncReadStream& s, size_t capacity,
51  boost::function<void(const boost::system::error_code&)> error_callback)
52  : stream_(s), read_requested_bytes_(0), error_callback_(error_callback) {
53  reset();
54  mem_.resize(capacity);
55  ROS_ASSERT_MSG(error_callback_, "Bad error callback passed to read buffer.");
56  }
57 
62  void read(size_t requested_bytes, boost::function<void(ros::serialization::IStream&)> callback) {
63  ROS_DEBUG_STREAM_NAMED("async_read", "Buffer read of " << requested_bytes << " bytes, " <<
64  "wi: " << write_index_ << ", ri: " << read_index_);
65 
66  ROS_ASSERT_MSG(read_requested_bytes_ == 0, "Bytes requested is nonzero, is there an operation already pending?");
67  ROS_ASSERT_MSG(callback, "Bad read success callback function.");
69  read_requested_bytes_ = requested_bytes;
70 
71  if (read_requested_bytes_ > mem_.size())
72  {
73  // Insufficient room in the buffer for the requested bytes,
74  ROS_ERROR_STREAM_NAMED("async_read", "Requested to read " << read_requested_bytes_ <<
75  " bytes, but buffer capacity is only " << mem_.size() << ".");
76  error_callback_(boost::system::errc::make_error_code(boost::system::errc::no_buffer_space));
77  return;
78  }
79 
80  // Number of bytes which must be transferred to satisfy the request.
81  ssize_t transfer_bytes = read_requested_bytes_ - bytesAvailable();
82 
83  if (transfer_bytes > 0)
84  {
85  // If we don't have enough headroom in the buffer, we'll have to shift what's currently in there to make room.
86  if (bytesHeadroom() < transfer_bytes)
87  {
88  memmove(&mem_[0], &mem_[read_index_], bytesAvailable());
90  read_index_ = 0;
91  }
92 
93  // Initiate a read from hardware so that we have enough bytes to fill the user request.
94  ROS_DEBUG_STREAM_NAMED("async_read", "Requesting transfer of at least " << transfer_bytes << " byte(s).");
95  boost::asio::async_read(stream_,
96  boost::asio::buffer(&mem_[write_index_], bytesHeadroom()),
97  boost::asio::transfer_at_least(transfer_bytes),
98  boost::bind(&AsyncReadBuffer::callback, this,
99  boost::asio::placeholders::error,
100  boost::asio::placeholders::bytes_transferred));
101  }
102  else
103  {
104  // We have enough in the buffer already, can fill the request without going to hardware.
106  }
107  }
108 
109 private:
110  void reset()
111  {
112  read_index_ = 0;
113  write_index_ = 0;
114  }
115 
116  inline size_t bytesAvailable()
117  {
118  return write_index_ - read_index_;
119  }
120 
121  inline size_t bytesHeadroom()
122  {
123  return mem_.size() - write_index_;
124  }
125 
130  void callback(const boost::system::error_code& error, size_t bytes_transferred)
131  {
132  if (error)
133  {
135  read_success_callback_.clear();
136  ROS_DEBUG_STREAM_NAMED("async_read", "Read operation failed with: " << error);
137 
138  if (error == boost::asio::error::operation_aborted)
139  {
140  // Special case for operation_aborted. The abort callback comes when the owning Session
141  // is in the middle of teardown, which means the callback is no longer valid.
142  }
143  else
144  {
145  error_callback_(error);
146  }
147  return;
148  }
149 
150  write_index_ += bytes_transferred;
151  ROS_DEBUG_STREAM_NAMED("async_read", "Successfully read " << bytes_transferred << " byte(s), now " << bytesAvailable() << " available.");
153  }
154 
160  {
161  ROS_DEBUG_STREAM_NAMED("async_read", "Invoking success callback with buffer of requested size " <<
162  read_requested_bytes_ << " byte(s).");
163 
165  read_index_ += read_requested_bytes_;
166 
167  // Post the callback rather than executing it here so, so that we have a chance to do the cleanup
168  // below prior to it actually getting run, in the event that the callback queues up another read.
169  stream_.get_io_service().post(boost::bind(read_success_callback_, stream));
170 
171  // Resetting these values clears our state so that we know there isn't a callback pending.
173  read_success_callback_.clear();
174 
175  if (bytesAvailable() == 0)
176  {
177  ROS_DEBUG_STREAM_NAMED("async_read", "Buffer is empty, resetting indexes to the beginning.");
178  reset();
179  }
180  }
181 
182  AsyncReadStream& stream_;
183  std::vector<uint8_t> mem_;
184 
185  size_t write_index_;
186  size_t read_index_;
187  boost::function<void(const boost::system::error_code&)> error_callback_;
188 
189  boost::function<void(ros::serialization::IStream&)> read_success_callback_;
191 };
192 
193 } // namespace
194 
195 #endif // ROSSERIAL_SERVER_ASYNC_READ_BUFFER_H
void callback(const boost::system::error_code &error, size_t bytes_transferred)
The internal callback which is called by the boost::asio::async_read invocation in the public read me...
#define ROS_DEBUG_STREAM_NAMED(name, args)
#define ROS_ERROR_STREAM_NAMED(name, args)
AsyncReadBuffer(AsyncReadStream &s, size_t capacity, boost::function< void(const boost::system::error_code &)> error_callback)
boost::function< void(ros::serialization::IStream &)> read_success_callback_
#define ROS_ASSERT_MSG(cond,...)
boost::function< void(const boost::system::error_code &)> error_callback_
void read(size_t requested_bytes, boost::function< void(ros::serialization::IStream &)> callback)
Commands a fixed number of bytes from the buffer. This may be fulfilled from existing buffer content...
void callSuccessCallback()
Calls the user&#39;s callback. This is a separate function because it gets called from two places...


rosserial_server
Author(s): Mike Purvis
autogenerated on Mon Jun 10 2019 14:53:36