$search
00001 //================================================================================================= 00002 // Copyright (c) 2012, Johannes Meyer, TU Darmstadt 00003 // All rights reserved. 00004 00005 // Redistribution and use in source and binary forms, with or without 00006 // modification, are permitted provided that the following conditions are met: 00007 // * Redistributions of source code must retain the above copyright 00008 // notice, this list of conditions and the following disclaimer. 00009 // * Redistributions in binary form must reproduce the above copyright 00010 // notice, this list of conditions and the following disclaimer in the 00011 // documentation and/or other materials provided with the distribution. 00012 // * Neither the name of the Flight Systems and Automatic Control group, 00013 // TU Darmstadt, nor the names of its contributors may be used to 00014 // endorse or promote products derived from this software without 00015 // specific prior written permission. 00016 00017 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 00018 // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 00019 // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 00020 // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY 00021 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 00022 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00023 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 00024 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 00025 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00026 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00027 //================================================================================================= 00028 00029 #ifndef UBLOX_GPS_ASYNC_WORKER_H 00030 #define UBLOX_GPS_ASYNC_WORKER_H 00031 00032 #include <ublox_gps/gps.h> 00033 00034 #include <boost/thread.hpp> 00035 #include <boost/thread/condition.hpp> 00036 #include <boost/asio.hpp> 00037 #include <boost/bind.hpp> 00038 00039 #include "worker.h" 00040 00041 namespace ublox_gps { 00042 00043 static const int debug = 0; 00044 00045 template <typename StreamT> 00046 class AsyncWorker : public Worker 00047 { 00048 public: 00049 AsyncWorker(StreamT& stream, boost::asio::io_service& io_service, std::size_t buffer_size = 8192); 00050 virtual ~AsyncWorker(); 00051 00052 void setCallback(const Callback& callback) { read_callback_ = callback; } 00053 00054 bool send(const unsigned char *data, const unsigned int size); 00055 void wait(const boost::posix_time::time_duration& timeout); 00056 00057 protected: 00058 void doRead(); 00059 void readEnd(const boost::system::error_code&, std::size_t); 00060 void doWrite(); 00061 void doClose(); 00062 00063 StreamT& stream_; 00064 boost::asio::io_service& io_service_; 00065 00066 boost::mutex read_mutex_; 00067 boost::condition read_condition_; 00068 std::vector<unsigned char> in_; 00069 std::size_t in_buffer_size_; 00070 00071 boost::mutex write_mutex_; 00072 boost::condition write_condition_; 00073 std::vector<unsigned char> out_; 00074 00075 boost::shared_ptr<boost::thread> background_thread_; 00076 Callback read_callback_; 00077 00078 bool stopping_; 00079 }; 00080 00081 template <typename StreamT> 00082 AsyncWorker<StreamT>::AsyncWorker(StreamT& stream, boost::asio::io_service& io_service, std::size_t buffer_size) 00083 : stream_(stream) 00084 , io_service_(io_service) 00085 , stopping_(false) 00086 { 00087 in_.resize(buffer_size); 00088 in_buffer_size_ = 0; 00089 00090 out_.reserve(buffer_size); 00091 00092 io_service_.post(boost::bind(&AsyncWorker<StreamT>::doRead, this)); 00093 background_thread_.reset(new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_))); 00094 } 00095 00096 template <typename StreamT> 00097 AsyncWorker<StreamT>::~AsyncWorker() 00098 { 00099 io_service_.post(boost::bind(&AsyncWorker<StreamT>::doClose, this)); 00100 background_thread_->join(); 00101 io_service_.reset(); 00102 } 00103 00104 template <typename StreamT> 00105 bool AsyncWorker<StreamT>::send(const unsigned char *data, const unsigned int size) { 00106 boost::mutex::scoped_lock lock(write_mutex_); 00107 00108 if (out_.capacity() - out_.size() < size) return false; 00109 out_.insert(out_.end(), data, data + size); 00110 00111 io_service_.post(boost::bind(&AsyncWorker<StreamT>::doWrite, this)); 00112 return true; 00113 } 00114 00115 template <typename StreamT> 00116 void AsyncWorker<StreamT>::doRead() 00117 { 00118 // read_mutex_.lock(); 00119 stream_.async_read_some(boost::asio::buffer(in_.data() + in_buffer_size_, in_.size() - in_buffer_size_), boost::bind(&AsyncWorker<StreamT>::readEnd, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 00120 } 00121 00122 template <typename StreamT> 00123 void AsyncWorker<StreamT>::readEnd(const boost::system::error_code& error, std::size_t bytes_transfered) 00124 { 00125 if (error) { 00126 // do something 00127 00128 } else if (bytes_transfered > 0) { 00129 in_buffer_size_ += bytes_transfered; 00130 00131 if (debug >= 4) { 00132 std::cout << "received " << bytes_transfered << " bytes" << std::endl; 00133 for(std::vector<unsigned char>::iterator it = in_.begin() + in_buffer_size_ - bytes_transfered; it != in_.begin() + in_buffer_size_; ++it) std::cout << std::hex << static_cast<unsigned int>(*it) << " "; 00134 std::cout << std::dec << std::endl; 00135 } 00136 00137 if (read_callback_) read_callback_(in_.data(), in_buffer_size_); 00138 00139 read_condition_.notify_all(); 00140 } 00141 00142 // read_mutex_.unlock(); 00143 if (!stopping_) io_service_.post(boost::bind(&AsyncWorker<StreamT>::doRead, this)); 00144 } 00145 00146 template <typename StreamT> 00147 void AsyncWorker<StreamT>::doWrite() 00148 { 00149 boost::mutex::scoped_lock lock(write_mutex_); 00150 if (out_.size() == 0) return; 00151 00152 boost::asio::write(stream_, boost::asio::buffer(out_.data(), out_.size())); 00153 00154 if (debug >= 2) { 00155 std::cout << "sent " << out_.size() << " bytes:" << std::endl; 00156 for(std::vector<unsigned char>::iterator it = out_.begin(); it != out_.end(); ++it) std::cout << std::hex << static_cast<unsigned int>(*it) << " "; 00157 std::cout << std::dec << std::endl; 00158 } 00159 out_.clear(); 00160 write_condition_.notify_all(); 00161 } 00162 00163 template <typename StreamT> 00164 void AsyncWorker<StreamT>::doClose() 00165 { 00166 stopping_ = true; 00167 boost::system::error_code error; 00168 stream_.cancel(error); 00169 } 00170 00171 template <typename StreamT> 00172 void AsyncWorker<StreamT>::wait(const boost::posix_time::time_duration &timeout) 00173 { 00174 boost::mutex::scoped_lock lock(read_mutex_); 00175 read_condition_.timed_wait(lock, timeout); 00176 } 00177 00178 } // namespace ublox_gps 00179 00180 #endif // UBLOX_GPS_ASYNC_WORKER_H