Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #ifndef UBLOX_GPS_ASYNC_WORKER_H
00030 #define UBLOX_GPS_ASYNC_WORKER_H
00031
00032 #include <ublox_gps/gps.h>
00033
00034 #include <boost/asio.hpp>
00035 #include <boost/bind.hpp>
00036 #include <boost/format.hpp>
00037 #include <boost/thread.hpp>
00038 #include <boost/thread/condition.hpp>
00039
00040
00041 #include "worker.h"
00042
00043 namespace ublox_gps {
00044
00045 int debug;
00046
00050 template <typename StreamT>
00051 class AsyncWorker : public Worker {
00052 public:
00053 typedef boost::mutex Mutex;
00054 typedef boost::mutex::scoped_lock ScopedLock;
00055
00062 AsyncWorker(boost::shared_ptr<StreamT> stream,
00063 boost::shared_ptr<boost::asio::io_service> io_service,
00064 std::size_t buffer_size = 8192);
00065 virtual ~AsyncWorker();
00066
00071 void setCallback(const Callback& callback) { read_callback_ = callback; }
00072
00077 void setRawDataCallback(const Callback& callback) { write_callback_ = callback; }
00078
00084 bool send(const unsigned char* data, const unsigned int size);
00089 void wait(const boost::posix_time::time_duration& timeout);
00090
00091 bool isOpen() const { return stream_->is_open(); }
00092
00093 protected:
00097 void doRead();
00098
00104 void readEnd(const boost::system::error_code&, std::size_t);
00105
00109 void doWrite();
00110
00114 void doClose();
00115
00116 boost::shared_ptr<StreamT> stream_;
00117 boost::shared_ptr<boost::asio::io_service> io_service_;
00118
00119 Mutex read_mutex_;
00120 boost::condition read_condition_;
00121 std::vector<unsigned char> in_;
00122 std::size_t in_buffer_size_;
00123
00124
00125 Mutex write_mutex_;
00126 boost::condition write_condition_;
00127 std::vector<unsigned char> out_;
00128
00129 boost::shared_ptr<boost::thread> background_thread_;
00130
00131 Callback read_callback_;
00132 Callback write_callback_;
00133
00134 bool stopping_;
00135 };
00136
00137 template <typename StreamT>
00138 AsyncWorker<StreamT>::AsyncWorker(boost::shared_ptr<StreamT> stream,
00139 boost::shared_ptr<boost::asio::io_service> io_service,
00140 std::size_t buffer_size)
00141 : stopping_(false) {
00142 stream_ = stream;
00143 io_service_ = io_service;
00144 in_.resize(buffer_size);
00145 in_buffer_size_ = 0;
00146
00147 out_.reserve(buffer_size);
00148
00149 io_service_->post(boost::bind(&AsyncWorker<StreamT>::doRead, this));
00150 background_thread_.reset(new boost::thread(
00151 boost::bind(&boost::asio::io_service::run, io_service_)));
00152 }
00153
00154 template <typename StreamT>
00155 AsyncWorker<StreamT>::~AsyncWorker() {
00156 io_service_->post(boost::bind(&AsyncWorker<StreamT>::doClose, this));
00157 background_thread_->join();
00158
00159 }
00160
00161 template <typename StreamT>
00162 bool AsyncWorker<StreamT>::send(const unsigned char* data,
00163 const unsigned int size) {
00164 ScopedLock lock(write_mutex_);
00165 if(size == 0) {
00166 ROS_ERROR("Ublox AsyncWorker::send: Size of message to send is 0");
00167 return true;
00168 }
00169
00170 if (out_.capacity() - out_.size() < size) {
00171 ROS_ERROR("Ublox AsyncWorker::send: Output buffer too full to send message");
00172 return false;
00173 }
00174 out_.insert(out_.end(), data, data + size);
00175
00176 io_service_->post(boost::bind(&AsyncWorker<StreamT>::doWrite, this));
00177 return true;
00178 }
00179
00180 template <typename StreamT>
00181 void AsyncWorker<StreamT>::doWrite() {
00182 ScopedLock lock(write_mutex_);
00183
00184 if (out_.size() == 0) {
00185 return;
00186 }
00187
00188 boost::asio::write(*stream_, boost::asio::buffer(out_.data(), out_.size()));
00189
00190 if (debug >= 2) {
00191
00192 std::ostringstream oss;
00193 for (std::vector<unsigned char>::iterator it = out_.begin();
00194 it != out_.end(); ++it)
00195 oss << boost::format("%02x") % static_cast<unsigned int>(*it) << " ";
00196 ROS_DEBUG("U-Blox sent %li bytes: \n%s", out_.size(), oss.str().c_str());
00197 }
00198
00199 out_.clear();
00200 write_condition_.notify_all();
00201 }
00202
00203 template <typename StreamT>
00204 void AsyncWorker<StreamT>::doRead() {
00205 ScopedLock lock(read_mutex_);
00206 stream_->async_read_some(
00207 boost::asio::buffer(in_.data() + in_buffer_size_,
00208 in_.size() - in_buffer_size_),
00209 boost::bind(&AsyncWorker<StreamT>::readEnd, this,
00210 boost::asio::placeholders::error,
00211 boost::asio::placeholders::bytes_transferred));
00212 }
00213
00214 template <typename StreamT>
00215 void AsyncWorker<StreamT>::readEnd(const boost::system::error_code& error,
00216 std::size_t bytes_transfered) {
00217 ScopedLock lock(read_mutex_);
00218 if (error) {
00219 ROS_ERROR("U-Blox ASIO input buffer read error: %s, %li",
00220 error.message().c_str(),
00221 bytes_transfered);
00222 } else if (bytes_transfered > 0) {
00223 in_buffer_size_ += bytes_transfered;
00224
00225 unsigned char *pRawDataStart = &(*(in_.begin() + (in_buffer_size_ - bytes_transfered)));
00226 std::size_t raw_data_stream_size = bytes_transfered;
00227
00228 if (write_callback_)
00229 write_callback_(pRawDataStart, raw_data_stream_size);
00230
00231 if (debug >= 4) {
00232 std::ostringstream oss;
00233 for (std::vector<unsigned char>::iterator it =
00234 in_.begin() + in_buffer_size_ - bytes_transfered;
00235 it != in_.begin() + in_buffer_size_; ++it)
00236 oss << boost::format("%02x") % static_cast<unsigned int>(*it) << " ";
00237 ROS_DEBUG("U-Blox received %li bytes \n%s", bytes_transfered,
00238 oss.str().c_str());
00239 }
00240
00241 if (read_callback_)
00242 read_callback_(in_.data(), in_buffer_size_);
00243
00244 read_condition_.notify_all();
00245 }
00246
00247 if (!stopping_)
00248 io_service_->post(boost::bind(&AsyncWorker<StreamT>::doRead, this));
00249 }
00250
00251 template <typename StreamT>
00252 void AsyncWorker<StreamT>::doClose() {
00253 ScopedLock lock(read_mutex_);
00254 stopping_ = true;
00255 boost::system::error_code error;
00256 stream_->close(error);
00257 if(error)
00258 ROS_ERROR_STREAM(
00259 "Error while closing the AsyncWorker stream: " << error.message());
00260 }
00261
00262 template <typename StreamT>
00263 void AsyncWorker<StreamT>::wait(
00264 const boost::posix_time::time_duration& timeout) {
00265 ScopedLock lock(read_mutex_);
00266 read_condition_.timed_wait(lock, timeout);
00267 }
00268
00269 }
00270
00271 #endif // UBLOX_GPS_ASYNC_WORKER_H