async_read_buffer.h
Go to the documentation of this file.
1 
34 #ifndef ROSSERIAL_SERVER_ASYNC_READ_BUFFER_H
35 #define ROSSERIAL_SERVER_ASYNC_READ_BUFFER_H
36 
37 #include <boost/bind.hpp>
38 #include <boost/asio.hpp>
39 #include <boost/function.hpp>
40 
41 #include <ros/ros.h>
42 
43 // ssize_t is POSIX-only type. Use make_signed for portable code.
44 #include <cstdint> // size_t
45 #include <type_traits> // std::make_signed
46 typedef std::make_signed<size_t>::type signed_size_t;
47 
49 {
50 
51 template<typename AsyncReadStream>
53 {
54 public:
55  AsyncReadBuffer(AsyncReadStream& s, size_t capacity,
56  boost::function<void(const boost::system::error_code&)> error_callback)
57  : stream_(s), read_requested_bytes_(0), error_callback_(error_callback) {
58  reset();
59  mem_.resize(capacity);
60  ROS_ASSERT_MSG(error_callback_, "Bad error callback passed to read buffer.");
61  }
62 
67  void read(size_t requested_bytes, boost::function<void(ros::serialization::IStream&)> callback) {
68  ROS_DEBUG_STREAM_NAMED("async_read", "Buffer read of " << requested_bytes << " bytes, " <<
69  "wi: " << write_index_ << ", ri: " << read_index_);
70 
71  ROS_ASSERT_MSG(read_requested_bytes_ == 0, "Bytes requested is nonzero, is there an operation already pending?");
72  ROS_ASSERT_MSG(callback, "Bad read success callback function.");
74  read_requested_bytes_ = requested_bytes;
75 
76  if (read_requested_bytes_ > mem_.size())
77  {
78  // Insufficient room in the buffer for the requested bytes,
79  ROS_ERROR_STREAM_NAMED("async_read", "Requested to read " << read_requested_bytes_ <<
80  " bytes, but buffer capacity is only " << mem_.size() << ".");
81  error_callback_(boost::system::errc::make_error_code(boost::system::errc::no_buffer_space));
82  return;
83  }
84 
85  // Number of bytes which must be transferred to satisfy the request.
87 
88  if (transfer_bytes > 0)
89  {
90  // If we don't have enough headroom in the buffer, we'll have to shift what's currently in there to make room.
91  if (bytesHeadroom() < transfer_bytes)
92  {
93  memmove(&mem_[0], &mem_[read_index_], bytesAvailable());
95  read_index_ = 0;
96  }
97 
98  // Initiate a read from hardware so that we have enough bytes to fill the user request.
99  ROS_DEBUG_STREAM_NAMED("async_read", "Requesting transfer of at least " << transfer_bytes << " byte(s).");
100  boost::asio::async_read(stream_,
101  boost::asio::buffer(&mem_[write_index_], bytesHeadroom()),
102  boost::asio::transfer_at_least(transfer_bytes),
103  boost::bind(&AsyncReadBuffer::callback, this,
104  boost::asio::placeholders::error,
105  boost::asio::placeholders::bytes_transferred));
106  }
107  else
108  {
109  // We have enough in the buffer already, can fill the request without going to hardware.
111  }
112  }
113 
114 private:
115  void reset()
116  {
117  read_index_ = 0;
118  write_index_ = 0;
119  }
120 
121  inline size_t bytesAvailable()
122  {
123  return write_index_ - read_index_;
124  }
125 
126  inline size_t bytesHeadroom()
127  {
128  return mem_.size() - write_index_;
129  }
130 
135  void callback(const boost::system::error_code& error, size_t bytes_transferred)
136  {
137  if (error)
138  {
140  read_success_callback_.clear();
141  ROS_DEBUG_STREAM_NAMED("async_read", "Read operation failed with: " << error);
142 
143  if (error == boost::asio::error::operation_aborted)
144  {
145  // Special case for operation_aborted. The abort callback comes when the owning Session
146  // is in the middle of teardown, which means the callback is no longer valid.
147  }
148  else
149  {
150  error_callback_(error);
151  }
152  return;
153  }
154 
155  write_index_ += bytes_transferred;
156  ROS_DEBUG_STREAM_NAMED("async_read", "Successfully read " << bytes_transferred << " byte(s), now " << bytesAvailable() << " available.");
158  }
159 
165  {
166  ROS_DEBUG_STREAM_NAMED("async_read", "Invoking success callback with buffer of requested size " <<
167  read_requested_bytes_ << " byte(s).");
168 
170  read_index_ += read_requested_bytes_;
171 
172  // Post the callback rather than executing it here so, so that we have a chance to do the cleanup
173  // below prior to it actually getting run, in the event that the callback queues up another read.
174 #if BOOST_VERSION >= 107000
175  boost::asio::post(stream_.get_executor(), boost::bind(read_success_callback_, stream));
176 #else
177  stream_.get_io_service().post(boost::bind(read_success_callback_, stream));
178 #endif
179 
180  // Resetting these values clears our state so that we know there isn't a callback pending.
182  read_success_callback_.clear();
183 
184  if (bytesAvailable() == 0)
185  {
186  ROS_DEBUG_STREAM_NAMED("async_read", "Buffer is empty, resetting indexes to the beginning.");
187  reset();
188  }
189  }
190 
191  AsyncReadStream& stream_;
192  std::vector<uint8_t> mem_;
193 
194  size_t write_index_;
195  size_t read_index_;
196  boost::function<void(const boost::system::error_code&)> error_callback_;
197 
198  boost::function<void(ros::serialization::IStream&)> read_success_callback_;
200 };
201 
202 } // namespace
203 
204 #endif // ROSSERIAL_SERVER_ASYNC_READ_BUFFER_H
void callback(const boost::system::error_code &error, size_t bytes_transferred)
The internal callback which is called by the boost::asio::async_read invocation in the public read me...
#define ROS_DEBUG_STREAM_NAMED(name, args)
#define ROS_ERROR_STREAM_NAMED(name, args)
std::make_signed< size_t >::type signed_size_t
AsyncReadBuffer(AsyncReadStream &s, size_t capacity, boost::function< void(const boost::system::error_code &)> error_callback)
boost::function< void(ros::serialization::IStream &)> read_success_callback_
#define ROS_ASSERT_MSG(cond,...)
boost::function< void(const boost::system::error_code &)> error_callback_
void read(size_t requested_bytes, boost::function< void(ros::serialization::IStream &)> callback)
Commands a fixed number of bytes from the buffer. This may be fulfilled from existing buffer content...
void callSuccessCallback()
Calls the user&#39;s callback. This is a separate function because it gets called from two places...


rosserial_server
Author(s): Mike Purvis
autogenerated on Mon Feb 28 2022 23:35:31