async_worker.h
Go to the documentation of this file.
1 //==============================================================================
2 // Copyright (c) 2012, Johannes Meyer, TU Darmstadt
3 // All rights reserved.
4 
5 // Redistribution and use in source and binary forms, with or without
6 // modification, are permitted provided that the following conditions are met:
7 // * Redistributions of source code must retain the above copyright
8 // notice, this list of conditions and the following disclaimer.
9 // * Redistributions in binary form must reproduce the above copyright
10 // notice, this list of conditions and the following disclaimer in the
11 // documentation and/or other materials provided with the distribution.
12 // * Neither the name of the Flight Systems and Automatic Control group,
13 // TU Darmstadt, nor the names of its contributors may be used to
14 // endorse or promote products derived from this software without
15 // specific prior written permission.
16 
17 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY
21 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
22 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
23 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
24 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
26 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 //==============================================================================
28 
29 #ifndef UBLOX_GPS_ASYNC_WORKER_H
30 #define UBLOX_GPS_ASYNC_WORKER_H
31 
32 #include <ublox_gps/gps.h>
33 
34 #include <boost/asio.hpp>
35 #include <boost/bind.hpp>
36 #include <boost/format.hpp>
37 #include <boost/thread.hpp>
38 #include <boost/thread/condition.hpp>
39 
40 
41 #include "worker.h"
42 
43 namespace ublox_gps {
44 
45 int debug;
46 
50 template <typename StreamT>
51 class AsyncWorker : public Worker {
52  public:
53  typedef boost::mutex Mutex;
54  typedef boost::mutex::scoped_lock ScopedLock;
55 
64  std::size_t buffer_size = 8192);
65  virtual ~AsyncWorker();
66 
71  void setCallback(const Callback& callback) { read_callback_ = callback; }
72 
77  void setRawDataCallback(const Callback& callback) { write_callback_ = callback; }
78 
84  bool send(const unsigned char* data, const unsigned int size);
89  void wait(const boost::posix_time::time_duration& timeout);
90 
91  bool isOpen() const { return stream_->is_open(); }
92 
93  protected:
97  void doRead();
98 
104  void readEnd(const boost::system::error_code&, std::size_t);
105 
109  void doWrite();
110 
114  void doClose();
115 
118 
119  Mutex read_mutex_;
120  boost::condition read_condition_;
121  std::vector<unsigned char> in_;
122  std::size_t in_buffer_size_;
123 
125  Mutex write_mutex_;
126  boost::condition write_condition_;
127  std::vector<unsigned char> out_;
128 
133 
134  bool stopping_;
135 };
136 
137 template <typename StreamT>
140  std::size_t buffer_size)
141  : stopping_(false) {
142  stream_ = stream;
143  io_service_ = io_service;
144  in_.resize(buffer_size);
145  in_buffer_size_ = 0;
146 
147  out_.reserve(buffer_size);
148 
149  io_service_->post(boost::bind(&AsyncWorker<StreamT>::doRead, this));
150  background_thread_.reset(new boost::thread(
151  boost::bind(&boost::asio::io_service::run, io_service_)));
152 }
153 
154 template <typename StreamT>
156  io_service_->post(boost::bind(&AsyncWorker<StreamT>::doClose, this));
157  background_thread_->join();
158  //io_service_->reset();
159 }
160 
161 template <typename StreamT>
162 bool AsyncWorker<StreamT>::send(const unsigned char* data,
163  const unsigned int size) {
164  ScopedLock lock(write_mutex_);
165  if(size == 0) {
166  ROS_ERROR("Ublox AsyncWorker::send: Size of message to send is 0");
167  return true;
168  }
169 
170  if (out_.capacity() - out_.size() < size) {
171  ROS_ERROR("Ublox AsyncWorker::send: Output buffer too full to send message");
172  return false;
173  }
174  out_.insert(out_.end(), data, data + size);
175 
176  io_service_->post(boost::bind(&AsyncWorker<StreamT>::doWrite, this));
177  return true;
178 }
179 
180 template <typename StreamT>
182  ScopedLock lock(write_mutex_);
183  // Do nothing if out buffer is empty
184  if (out_.size() == 0) {
185  return;
186  }
187  // Write all the data in the out buffer
188  boost::asio::write(*stream_, boost::asio::buffer(out_.data(), out_.size()));
189 
190  if (debug >= 2) {
191  // Print the data that was sent
192  std::ostringstream oss;
193  for (std::vector<unsigned char>::iterator it = out_.begin();
194  it != out_.end(); ++it)
195  oss << boost::format("%02x") % static_cast<unsigned int>(*it) << " ";
196  ROS_DEBUG("U-Blox sent %li bytes: \n%s", out_.size(), oss.str().c_str());
197  }
198  // Clear the buffer & unlock
199  out_.clear();
200  write_condition_.notify_all();
201 }
202 
203 template <typename StreamT>
205  ScopedLock lock(read_mutex_);
206  stream_->async_read_some(
207  boost::asio::buffer(in_.data() + in_buffer_size_,
208  in_.size() - in_buffer_size_),
209  boost::bind(&AsyncWorker<StreamT>::readEnd, this,
210  boost::asio::placeholders::error,
211  boost::asio::placeholders::bytes_transferred));
212 }
213 
214 template <typename StreamT>
215 void AsyncWorker<StreamT>::readEnd(const boost::system::error_code& error,
216  std::size_t bytes_transfered) {
217  ScopedLock lock(read_mutex_);
218  if (error) {
219  ROS_ERROR("U-Blox ASIO input buffer read error: %s, %li",
220  error.message().c_str(),
221  bytes_transfered);
222  } else if (bytes_transfered > 0) {
223  in_buffer_size_ += bytes_transfered;
224 
225  unsigned char *pRawDataStart = &(*(in_.begin() + (in_buffer_size_ - bytes_transfered)));
226  std::size_t raw_data_stream_size = bytes_transfered;
227 
228  if (write_callback_)
229  write_callback_(pRawDataStart, raw_data_stream_size);
230 
231  if (debug >= 4) {
232  std::ostringstream oss;
233  for (std::vector<unsigned char>::iterator it =
234  in_.begin() + in_buffer_size_ - bytes_transfered;
235  it != in_.begin() + in_buffer_size_; ++it)
236  oss << boost::format("%02x") % static_cast<unsigned int>(*it) << " ";
237  ROS_DEBUG("U-Blox received %li bytes \n%s", bytes_transfered,
238  oss.str().c_str());
239  }
240 
241  if (read_callback_)
243 
244  read_condition_.notify_all();
245  }
246 
247  if (!stopping_)
248  io_service_->post(boost::bind(&AsyncWorker<StreamT>::doRead, this));
249 }
250 
251 template <typename StreamT>
253  ScopedLock lock(read_mutex_);
254  stopping_ = true;
255  boost::system::error_code error;
256  stream_->close(error);
257  if(error)
259  "Error while closing the AsyncWorker stream: " << error.message());
260 }
261 
262 template <typename StreamT>
264  const boost::posix_time::time_duration& timeout) {
265  ScopedLock lock(read_mutex_);
266  read_condition_.timed_wait(lock, timeout);
267 }
268 
269 } // namespace ublox_gps
270 
271 #endif // UBLOX_GPS_ASYNC_WORKER_H
int debug
Used to determine which debug messages to display.
Definition: async_worker.h:45
Mutex write_mutex_
Lock for the output buffer.
Definition: async_worker.h:125
bool send(const unsigned char *data, const unsigned int size)
Send the data bytes via the I/O stream.
Definition: async_worker.h:162
Callback write_callback_
Callback function to handle raw data.
Definition: async_worker.h:132
boost::mutex::scoped_lock ScopedLock
Definition: async_worker.h:54
data
boost::function< void(unsigned char *, std::size_t &)> Callback
Definition: worker.h:42
boost::shared_ptr< boost::asio::io_service > io_service_
The I/O service.
Definition: async_worker.h:117
void readEnd(const boost::system::error_code &, std::size_t)
Process messages read from the input stream.
Definition: async_worker.h:215
bool stopping_
Whether or not the I/O service is closed.
Definition: async_worker.h:134
boost::shared_ptr< StreamT > stream_
The I/O stream.
Definition: async_worker.h:116
bool isOpen() const
Whether or not the I/O stream is open.
Definition: async_worker.h:91
void doWrite()
Send all the data in the output buffer.
Definition: async_worker.h:181
Handles I/O reading and writing.
Definition: worker.h:40
boost::shared_ptr< boost::thread > background_thread_
Definition: async_worker.h:129
void doRead()
Read the input stream.
Definition: async_worker.h:204
boost::condition read_condition_
Definition: async_worker.h:120
std::size_t in_buffer_size_
Definition: async_worker.h:122
AsyncWorker(boost::shared_ptr< StreamT > stream, boost::shared_ptr< boost::asio::io_service > io_service, std::size_t buffer_size=8192)
Construct an Asynchronous I/O worker.
Definition: async_worker.h:138
Handles Asynchronous I/O reading and writing.
Definition: async_worker.h:51
void doClose()
Close the I/O stream.
Definition: async_worker.h:252
std::vector< unsigned char > in_
The input buffer.
Definition: async_worker.h:121
void setRawDataCallback(const Callback &callback)
Set the callback function which handles raw data.
Definition: async_worker.h:77
void setCallback(const Callback &callback)
Set the callback function which handles input messages.
Definition: async_worker.h:71
boost::condition write_condition_
Definition: async_worker.h:126
Mutex read_mutex_
Lock for the input buffer.
Definition: async_worker.h:119
#define ROS_ERROR_STREAM(args)
Callback read_callback_
Callback function to handle received messages.
Definition: async_worker.h:131
void wait(const boost::posix_time::time_duration &timeout)
Wait for incoming messages.
Definition: async_worker.h:263
#define ROS_ERROR(...)
std::vector< unsigned char > out_
The output buffer.
Definition: async_worker.h:127
#define ROS_DEBUG(...)


ublox_gps
Author(s): Johannes Meyer
autogenerated on Thu Jan 28 2021 03:13:52