Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #ifndef UBLOX_GPS_ASYNC_WORKER_H
00030 #define UBLOX_GPS_ASYNC_WORKER_H
00031
00032 #include <ublox_gps/gps.h>
00033
00034 #include <boost/thread.hpp>
00035 #include <boost/thread/condition.hpp>
00036 #include <boost/asio.hpp>
00037 #include <boost/bind.hpp>
00038
00039 #include "worker.h"
00040
00041 namespace ublox_gps {
00042
00043 static const int debug = 0;
00044
00045 template <typename StreamT>
00046 class AsyncWorker : public Worker
00047 {
00048 public:
00049 AsyncWorker(StreamT& stream, boost::asio::io_service& io_service, std::size_t buffer_size = 8192);
00050 virtual ~AsyncWorker();
00051
00052 void setCallback(const Callback& callback) { read_callback_ = callback; }
00053
00054 bool send(const unsigned char *data, const unsigned int size);
00055 void wait(const boost::posix_time::time_duration& timeout);
00056
00057 protected:
00058 void doRead();
00059 void readEnd(const boost::system::error_code&, std::size_t);
00060 void doWrite();
00061 void doClose();
00062
00063 StreamT& stream_;
00064 boost::asio::io_service& io_service_;
00065
00066 boost::mutex read_mutex_;
00067 boost::condition read_condition_;
00068 std::vector<unsigned char> in_;
00069 std::size_t in_buffer_size_;
00070
00071 boost::mutex write_mutex_;
00072 boost::condition write_condition_;
00073 std::vector<unsigned char> out_;
00074
00075 boost::shared_ptr<boost::thread> background_thread_;
00076 Callback read_callback_;
00077
00078 bool stopping_;
00079 };
00080
00081 template <typename StreamT>
00082 AsyncWorker<StreamT>::AsyncWorker(StreamT& stream, boost::asio::io_service& io_service, std::size_t buffer_size)
00083 : stream_(stream)
00084 , io_service_(io_service)
00085 , stopping_(false)
00086 {
00087 in_.resize(buffer_size);
00088 in_buffer_size_ = 0;
00089
00090 out_.reserve(buffer_size);
00091
00092 io_service_.post(boost::bind(&AsyncWorker<StreamT>::doRead, this));
00093 background_thread_.reset(new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_)));
00094 }
00095
00096 template <typename StreamT>
00097 AsyncWorker<StreamT>::~AsyncWorker()
00098 {
00099 io_service_.post(boost::bind(&AsyncWorker<StreamT>::doClose, this));
00100 background_thread_->join();
00101 io_service_.reset();
00102 }
00103
00104 template <typename StreamT>
00105 bool AsyncWorker<StreamT>::send(const unsigned char *data, const unsigned int size) {
00106 boost::mutex::scoped_lock lock(write_mutex_);
00107
00108 if (out_.capacity() - out_.size() < size) return false;
00109 out_.insert(out_.end(), data, data + size);
00110
00111 io_service_.post(boost::bind(&AsyncWorker<StreamT>::doWrite, this));
00112 return true;
00113 }
00114
00115 template <typename StreamT>
00116 void AsyncWorker<StreamT>::doRead()
00117 {
00118
00119 stream_.async_read_some(boost::asio::buffer(in_.data() + in_buffer_size_, in_.size() - in_buffer_size_), boost::bind(&AsyncWorker<StreamT>::readEnd, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
00120 }
00121
00122 template <typename StreamT>
00123 void AsyncWorker<StreamT>::readEnd(const boost::system::error_code& error, std::size_t bytes_transfered)
00124 {
00125 if (error) {
00126
00127
00128 } else if (bytes_transfered > 0) {
00129 in_buffer_size_ += bytes_transfered;
00130
00131 if (debug >= 4) {
00132 std::cout << "received " << bytes_transfered << " bytes" << std::endl;
00133 for(std::vector<unsigned char>::iterator it = in_.begin() + in_buffer_size_ - bytes_transfered; it != in_.begin() + in_buffer_size_; ++it) std::cout << std::hex << static_cast<unsigned int>(*it) << " ";
00134 std::cout << std::dec << std::endl;
00135 }
00136
00137 if (read_callback_) read_callback_(in_.data(), in_buffer_size_);
00138
00139 read_condition_.notify_all();
00140 }
00141
00142
00143 if (!stopping_) io_service_.post(boost::bind(&AsyncWorker<StreamT>::doRead, this));
00144 }
00145
00146 template <typename StreamT>
00147 void AsyncWorker<StreamT>::doWrite()
00148 {
00149 boost::mutex::scoped_lock lock(write_mutex_);
00150 if (out_.size() == 0) return;
00151
00152 boost::asio::write(stream_, boost::asio::buffer(out_.data(), out_.size()));
00153
00154 if (debug >= 2) {
00155 std::cout << "sent " << out_.size() << " bytes:" << std::endl;
00156 for(std::vector<unsigned char>::iterator it = out_.begin(); it != out_.end(); ++it) std::cout << std::hex << static_cast<unsigned int>(*it) << " ";
00157 std::cout << std::dec << std::endl;
00158 }
00159 out_.clear();
00160 write_condition_.notify_all();
00161 }
00162
00163 template <typename StreamT>
00164 void AsyncWorker<StreamT>::doClose()
00165 {
00166 stopping_ = true;
00167 boost::system::error_code error;
00168 stream_.cancel(error);
00169 }
00170
00171 template <typename StreamT>
00172 void AsyncWorker<StreamT>::wait(const boost::posix_time::time_duration &timeout)
00173 {
00174 boost::mutex::scoped_lock lock(read_mutex_);
00175 read_condition_.timed_wait(lock, timeout);
00176 }
00177
00178 }
00179
00180 #endif // UBLOX_GPS_ASYNC_WORKER_H