async_worker.h
Go to the documentation of this file.
00001 //=================================================================================================
00002 // Copyright (c) 2012, Johannes Meyer, TU Darmstadt
00003 // All rights reserved.
00004 
00005 // Redistribution and use in source and binary forms, with or without
00006 // modification, are permitted provided that the following conditions are met:
00007 //     * Redistributions of source code must retain the above copyright
00008 //       notice, this list of conditions and the following disclaimer.
00009 //     * Redistributions in binary form must reproduce the above copyright
00010 //       notice, this list of conditions and the following disclaimer in the
00011 //       documentation and/or other materials provided with the distribution.
00012 //     * Neither the name of the Flight Systems and Automatic Control group,
00013 //       TU Darmstadt, nor the names of its contributors may be used to
00014 //       endorse or promote products derived from this software without
00015 //       specific prior written permission.
00016 
00017 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
00018 // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00019 // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00020 // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY
00021 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
00022 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00023 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
00024 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00025 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00026 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00027 //=================================================================================================
00028 
00029 #ifndef UBLOX_GPS_ASYNC_WORKER_H
00030 #define UBLOX_GPS_ASYNC_WORKER_H
00031 
00032 #include <ublox_gps/gps.h>
00033 
00034 #include <boost/thread.hpp>
00035 #include <boost/thread/condition.hpp>
00036 #include <boost/asio.hpp>
00037 #include <boost/bind.hpp>
00038 
00039 #include "worker.h"
00040 
00041 namespace ublox_gps {
00042 
00043 static const int debug = 0;
00044 
00045 template <typename StreamT>
00046 class AsyncWorker : public Worker
00047 {
00048 public:
00049   AsyncWorker(StreamT& stream, boost::asio::io_service& io_service, std::size_t buffer_size = 8192);
00050   virtual ~AsyncWorker();
00051 
00052   void setCallback(const Callback& callback) { read_callback_ = callback; }
00053 
00054   bool send(const unsigned char *data, const unsigned int size);
00055   void wait(const boost::posix_time::time_duration& timeout);
00056 
00057 protected:
00058   void doRead();
00059   void readEnd(const boost::system::error_code&, std::size_t);
00060   void doWrite();
00061   void doClose();
00062 
00063   StreamT& stream_;
00064   boost::asio::io_service& io_service_;
00065 
00066   boost::mutex read_mutex_;
00067   boost::condition read_condition_;
00068   std::vector<unsigned char> in_;
00069   std::size_t in_buffer_size_;
00070 
00071   boost::mutex write_mutex_;
00072   boost::condition write_condition_;
00073   std::vector<unsigned char> out_;
00074 
00075   boost::shared_ptr<boost::thread> background_thread_;
00076   Callback read_callback_;
00077 
00078   bool stopping_;
00079 };
00080 
00081 template <typename StreamT>
00082 AsyncWorker<StreamT>::AsyncWorker(StreamT& stream, boost::asio::io_service& io_service, std::size_t buffer_size)
00083   : stream_(stream)
00084   , io_service_(io_service)
00085   , stopping_(false)
00086 {
00087   in_.resize(buffer_size);
00088   in_buffer_size_ = 0;
00089 
00090   out_.reserve(buffer_size);
00091 
00092   io_service_.post(boost::bind(&AsyncWorker<StreamT>::doRead, this));
00093   background_thread_.reset(new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_)));
00094 }
00095 
00096 template <typename StreamT>
00097 AsyncWorker<StreamT>::~AsyncWorker()
00098 {
00099   io_service_.post(boost::bind(&AsyncWorker<StreamT>::doClose, this));
00100   background_thread_->join();
00101   io_service_.reset();
00102 }
00103 
00104 template <typename StreamT>
00105 bool AsyncWorker<StreamT>::send(const unsigned char *data, const unsigned int size) {
00106   boost::mutex::scoped_lock lock(write_mutex_);
00107 
00108   if (out_.capacity() - out_.size() < size) return false;
00109   out_.insert(out_.end(), data, data + size);
00110 
00111   io_service_.post(boost::bind(&AsyncWorker<StreamT>::doWrite, this));
00112   return true;
00113 }
00114 
00115 template <typename StreamT>
00116 void AsyncWorker<StreamT>::doRead()
00117 {
00118   // read_mutex_.lock();
00119   stream_.async_read_some(boost::asio::buffer(in_.data() + in_buffer_size_, in_.size() - in_buffer_size_), boost::bind(&AsyncWorker<StreamT>::readEnd, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
00120 }
00121 
00122 template <typename StreamT>
00123 void AsyncWorker<StreamT>::readEnd(const boost::system::error_code& error, std::size_t bytes_transfered)
00124 {
00125   if (error) {
00126     // do something
00127 
00128   } else if (bytes_transfered > 0) {
00129     in_buffer_size_ += bytes_transfered;
00130 
00131     if (debug >= 4) {
00132       std::cout << "received " << bytes_transfered << " bytes" << std::endl;
00133       for(std::vector<unsigned char>::iterator it = in_.begin() + in_buffer_size_ - bytes_transfered; it != in_.begin() + in_buffer_size_; ++it) std::cout << std::hex << static_cast<unsigned int>(*it) << " ";
00134       std::cout << std::dec << std::endl;
00135     }
00136 
00137     if (read_callback_) read_callback_(in_.data(), in_buffer_size_);
00138 
00139     read_condition_.notify_all();
00140   }
00141 
00142   // read_mutex_.unlock();
00143   if (!stopping_) io_service_.post(boost::bind(&AsyncWorker<StreamT>::doRead, this));
00144 }
00145 
00146 template <typename StreamT>
00147 void AsyncWorker<StreamT>::doWrite()
00148 {
00149   boost::mutex::scoped_lock lock(write_mutex_);
00150   if (out_.size() == 0) return;
00151 
00152   boost::asio::write(stream_, boost::asio::buffer(out_.data(), out_.size()));
00153 
00154   if (debug >= 2) {
00155     std::cout << "sent " << out_.size() << " bytes:" << std::endl;
00156     for(std::vector<unsigned char>::iterator it = out_.begin(); it != out_.end(); ++it) std::cout << std::hex << static_cast<unsigned int>(*it) << " ";
00157     std::cout << std::dec << std::endl;
00158   }
00159   out_.clear();
00160   write_condition_.notify_all();
00161 }
00162 
00163 template <typename StreamT>
00164 void AsyncWorker<StreamT>::doClose()
00165 {
00166   stopping_ = true;
00167   boost::system::error_code error;
00168   stream_.cancel(error);
00169 }
00170 
00171 template <typename StreamT>
00172 void AsyncWorker<StreamT>::wait(const boost::posix_time::time_duration &timeout)
00173 {
00174   boost::mutex::scoped_lock lock(read_mutex_);
00175   read_condition_.timed_wait(lock, timeout);
00176 }
00177 
00178 } // namespace ublox_gps
00179 
00180 #endif // UBLOX_GPS_ASYNC_WORKER_H
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends


ublox_gps
Author(s): Johannes Meyer
autogenerated on Mon Jul 15 2013 16:56:50