Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #ifndef UBLOX_GPS_ASYNC_WORKER_H
00030 #define UBLOX_GPS_ASYNC_WORKER_H
00031
00032 #include <ublox_gps/gps.h>
00033
00034 #include <boost/asio.hpp>
00035 #include <boost/bind.hpp>
00036 #include <boost/format.hpp>
00037 #include <boost/thread.hpp>
00038 #include <boost/thread/condition.hpp>
00039
00040
00041 #include "worker.h"
00042
00043 namespace ublox_gps {
00044
00045 int debug;
00046
00050 template <typename StreamT>
00051 class AsyncWorker : public Worker {
00052 public:
00053 typedef boost::mutex Mutex;
00054 typedef boost::mutex::scoped_lock ScopedLock;
00055
00062 AsyncWorker(boost::shared_ptr<StreamT> stream,
00063 boost::shared_ptr<boost::asio::io_service> io_service,
00064 std::size_t buffer_size = 8192);
00065 virtual ~AsyncWorker();
00066
00071 void setCallback(const Callback& callback) { read_callback_ = callback; }
00072
00078 bool send(const unsigned char* data, const unsigned int size);
00083 void wait(const boost::posix_time::time_duration& timeout);
00084
00085 bool isOpen() const { return stream_->is_open(); }
00086
00087 protected:
00091 void doRead();
00092
00098 void readEnd(const boost::system::error_code&, std::size_t);
00099
00103 void doWrite();
00104
00108 void doClose();
00109
00110 boost::shared_ptr<StreamT> stream_;
00111 boost::shared_ptr<boost::asio::io_service> io_service_;
00112
00113 Mutex read_mutex_;
00114 boost::condition read_condition_;
00115 std::vector<unsigned char> in_;
00116 std::size_t in_buffer_size_;
00117
00118
00119 Mutex write_mutex_;
00120 boost::condition write_condition_;
00121 std::vector<unsigned char> out_;
00122
00123 boost::shared_ptr<boost::thread> background_thread_;
00124
00125 Callback read_callback_;
00126
00127 bool stopping_;
00128 };
00129
00130 template <typename StreamT>
00131 AsyncWorker<StreamT>::AsyncWorker(boost::shared_ptr<StreamT> stream,
00132 boost::shared_ptr<boost::asio::io_service> io_service,
00133 std::size_t buffer_size)
00134 : stopping_(false) {
00135 stream_ = stream;
00136 io_service_ = io_service;
00137 in_.resize(buffer_size);
00138 in_buffer_size_ = 0;
00139
00140 out_.reserve(buffer_size);
00141
00142 io_service_->post(boost::bind(&AsyncWorker<StreamT>::doRead, this));
00143 background_thread_.reset(new boost::thread(
00144 boost::bind(&boost::asio::io_service::run, io_service_)));
00145 }
00146
00147 template <typename StreamT>
00148 AsyncWorker<StreamT>::~AsyncWorker() {
00149 io_service_->post(boost::bind(&AsyncWorker<StreamT>::doClose, this));
00150 background_thread_->join();
00151 io_service_->reset();
00152 }
00153
00154 template <typename StreamT>
00155 bool AsyncWorker<StreamT>::send(const unsigned char* data,
00156 const unsigned int size) {
00157 ScopedLock lock(write_mutex_);
00158 if(size == 0) {
00159 ROS_ERROR("Ublox AsyncWorker::send: Size of message to send is 0");
00160 return true;
00161 }
00162
00163 if (out_.capacity() - out_.size() < size) {
00164 ROS_ERROR("Ublox AsyncWorker::send: Output buffer too full to send message");
00165 return false;
00166 }
00167 out_.insert(out_.end(), data, data + size);
00168
00169 io_service_->post(boost::bind(&AsyncWorker<StreamT>::doWrite, this));
00170 return true;
00171 }
00172
00173 template <typename StreamT>
00174 void AsyncWorker<StreamT>::doWrite() {
00175 ScopedLock lock(write_mutex_);
00176
00177 if (out_.size() == 0) {
00178 return;
00179 }
00180
00181 boost::asio::write(*stream_, boost::asio::buffer(out_.data(), out_.size()));
00182
00183 if (debug >= 2) {
00184
00185 std::ostringstream oss;
00186 for (std::vector<unsigned char>::iterator it = out_.begin();
00187 it != out_.end(); ++it)
00188 oss << boost::format("%02x") % static_cast<unsigned int>(*it) << " ";
00189 ROS_DEBUG("U-Blox sent %li bytes: \n%s", out_.size(), oss.str().c_str());
00190 }
00191
00192 out_.clear();
00193 write_condition_.notify_all();
00194 }
00195
00196 template <typename StreamT>
00197 void AsyncWorker<StreamT>::doRead() {
00198 ScopedLock lock(read_mutex_);
00199 stream_->async_read_some(
00200 boost::asio::buffer(in_.data() + in_buffer_size_,
00201 in_.size() - in_buffer_size_),
00202 boost::bind(&AsyncWorker<StreamT>::readEnd, this,
00203 boost::asio::placeholders::error,
00204 boost::asio::placeholders::bytes_transferred));
00205 }
00206
00207 template <typename StreamT>
00208 void AsyncWorker<StreamT>::readEnd(const boost::system::error_code& error,
00209 std::size_t bytes_transfered) {
00210 ScopedLock lock(read_mutex_);
00211 if (error) {
00212 ROS_ERROR("U-Blox ASIO input buffer read error: %s, %li",
00213 error.message().c_str(),
00214 bytes_transfered);
00215 } else if (bytes_transfered > 0) {
00216 in_buffer_size_ += bytes_transfered;
00217
00218 if (debug >= 4) {
00219 std::ostringstream oss;
00220 for (std::vector<unsigned char>::iterator it =
00221 in_.begin() + in_buffer_size_ - bytes_transfered;
00222 it != in_.begin() + in_buffer_size_; ++it)
00223 oss << boost::format("%02x") % static_cast<unsigned int>(*it) << " ";
00224 ROS_DEBUG("U-Blox received %li bytes \n%s", bytes_transfered,
00225 oss.str().c_str());
00226 }
00227
00228 if (read_callback_)
00229 read_callback_(in_.data(), in_buffer_size_);
00230
00231 read_condition_.notify_all();
00232 }
00233
00234 if (!stopping_)
00235 io_service_->post(boost::bind(&AsyncWorker<StreamT>::doRead, this));
00236 }
00237
00238 template <typename StreamT>
00239 void AsyncWorker<StreamT>::doClose() {
00240 ScopedLock lock(read_mutex_);
00241 stopping_ = true;
00242 boost::system::error_code error;
00243 stream_->cancel(error);
00244 }
00245
00246 template <typename StreamT>
00247 void AsyncWorker<StreamT>::wait(
00248 const boost::posix_time::time_duration& timeout) {
00249 ScopedLock lock(read_mutex_);
00250 read_condition_.timed_wait(lock, timeout);
00251 }
00252
00253 }
00254
00255 #endif // UBLOX_GPS_ASYNC_WORKER_H