async_worker.h
Go to the documentation of this file.
00001 //==============================================================================
00002 // Copyright (c) 2012, Johannes Meyer, TU Darmstadt
00003 // All rights reserved.
00004 
00005 // Redistribution and use in source and binary forms, with or without
00006 // modification, are permitted provided that the following conditions are met:
00007 //     * Redistributions of source code must retain the above copyright
00008 //       notice, this list of conditions and the following disclaimer.
00009 //     * Redistributions in binary form must reproduce the above copyright
00010 //       notice, this list of conditions and the following disclaimer in the
00011 //       documentation and/or other materials provided with the distribution.
00012 //     * Neither the name of the Flight Systems and Automatic Control group,
00013 //       TU Darmstadt, nor the names of its contributors may be used to
00014 //       endorse or promote products derived from this software without
00015 //       specific prior written permission.
00016 
00017 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
00018 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
00019 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
00020 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY
00021 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
00022 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00023 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
00024 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00025 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00026 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00027 //==============================================================================
00028 
00029 #ifndef UBLOX_GPS_ASYNC_WORKER_H
00030 #define UBLOX_GPS_ASYNC_WORKER_H
00031 
00032 #include <ublox_gps/gps.h>
00033 
00034 #include <boost/asio.hpp>
00035 #include <boost/bind.hpp>
00036 #include <boost/format.hpp>
00037 #include <boost/thread.hpp>
00038 #include <boost/thread/condition.hpp>
00039 
00040 
00041 #include "worker.h"
00042 
00043 namespace ublox_gps {
00044 
00045 int debug; 
00046 
00050 template <typename StreamT>
00051 class AsyncWorker : public Worker {
00052  public:
00053   typedef boost::mutex Mutex;
00054   typedef boost::mutex::scoped_lock ScopedLock;
00055 
00062   AsyncWorker(boost::shared_ptr<StreamT> stream, 
00063               boost::shared_ptr<boost::asio::io_service> io_service,
00064               std::size_t buffer_size = 8192);
00065   virtual ~AsyncWorker();
00066 
00071   void setCallback(const Callback& callback) { read_callback_ = callback; }
00072 
00078   bool send(const unsigned char* data, const unsigned int size);
00083   void wait(const boost::posix_time::time_duration& timeout);
00084 
00085   bool isOpen() const { return stream_->is_open(); }
00086 
00087  protected:
00091   void doRead();
00092   
00098   void readEnd(const boost::system::error_code&, std::size_t);
00099   
00103   void doWrite();
00104 
00108   void doClose();
00109 
00110   boost::shared_ptr<StreamT> stream_; 
00111   boost::shared_ptr<boost::asio::io_service> io_service_; 
00112 
00113   Mutex read_mutex_; 
00114   boost::condition read_condition_;
00115   std::vector<unsigned char> in_; 
00116   std::size_t in_buffer_size_; 
00117 
00118 
00119   Mutex write_mutex_; 
00120   boost::condition write_condition_;
00121   std::vector<unsigned char> out_; 
00122 
00123   boost::shared_ptr<boost::thread> background_thread_; 
00124 
00125   Callback read_callback_; 
00126 
00127   bool stopping_; 
00128 };
00129 
00130 template <typename StreamT>
00131 AsyncWorker<StreamT>::AsyncWorker(boost::shared_ptr<StreamT> stream,
00132         boost::shared_ptr<boost::asio::io_service> io_service,
00133         std::size_t buffer_size)
00134     : stopping_(false) {
00135   stream_ = stream;
00136   io_service_ = io_service;
00137   in_.resize(buffer_size);
00138   in_buffer_size_ = 0;
00139 
00140   out_.reserve(buffer_size);
00141 
00142   io_service_->post(boost::bind(&AsyncWorker<StreamT>::doRead, this));
00143   background_thread_.reset(new boost::thread(
00144       boost::bind(&boost::asio::io_service::run, io_service_)));
00145 }
00146 
00147 template <typename StreamT>
00148 AsyncWorker<StreamT>::~AsyncWorker() {
00149   io_service_->post(boost::bind(&AsyncWorker<StreamT>::doClose, this));
00150   background_thread_->join();
00151   io_service_->reset();
00152 }
00153 
00154 template <typename StreamT>
00155 bool AsyncWorker<StreamT>::send(const unsigned char* data,
00156                                 const unsigned int size) {
00157   ScopedLock lock(write_mutex_);
00158   if(size == 0) {
00159     ROS_ERROR("Ublox AsyncWorker::send: Size of message to send is 0");
00160     return true;
00161   }
00162 
00163   if (out_.capacity() - out_.size() < size) {
00164     ROS_ERROR("Ublox AsyncWorker::send: Output buffer too full to send message");
00165     return false;
00166   }
00167   out_.insert(out_.end(), data, data + size);
00168 
00169   io_service_->post(boost::bind(&AsyncWorker<StreamT>::doWrite, this));
00170   return true;
00171 }
00172 
00173 template <typename StreamT>
00174 void AsyncWorker<StreamT>::doWrite() {
00175   ScopedLock lock(write_mutex_);
00176   // Do nothing if out buffer is empty
00177   if (out_.size() == 0) {
00178     return;
00179   }
00180   // Write all the data in the out buffer
00181   boost::asio::write(*stream_, boost::asio::buffer(out_.data(), out_.size()));
00182 
00183   if (debug >= 2) {
00184     // Print the data that was sent
00185     std::ostringstream oss;
00186     for (std::vector<unsigned char>::iterator it = out_.begin();
00187          it != out_.end(); ++it)
00188       oss << boost::format("%02x") % static_cast<unsigned int>(*it) << " ";
00189     ROS_DEBUG("U-Blox sent %li bytes: \n%s", out_.size(), oss.str().c_str());
00190   }
00191   // Clear the buffer & unlock
00192   out_.clear();
00193   write_condition_.notify_all();
00194 }
00195 
00196 template <typename StreamT>
00197 void AsyncWorker<StreamT>::doRead() {
00198   ScopedLock lock(read_mutex_);
00199   stream_->async_read_some(
00200       boost::asio::buffer(in_.data() + in_buffer_size_,
00201                           in_.size() - in_buffer_size_),
00202                           boost::bind(&AsyncWorker<StreamT>::readEnd, this,
00203                               boost::asio::placeholders::error,
00204                               boost::asio::placeholders::bytes_transferred));
00205 }
00206 
00207 template <typename StreamT>
00208 void AsyncWorker<StreamT>::readEnd(const boost::system::error_code& error,
00209                                    std::size_t bytes_transfered) {
00210   ScopedLock lock(read_mutex_);
00211   if (error) {
00212     ROS_ERROR("U-Blox ASIO input buffer read error: %s, %li", 
00213               error.message().c_str(), 
00214               bytes_transfered);
00215   } else if (bytes_transfered > 0) {
00216     in_buffer_size_ += bytes_transfered;
00217 
00218     if (debug >= 4) {
00219       std::ostringstream oss;
00220       for (std::vector<unsigned char>::iterator it =
00221                in_.begin() + in_buffer_size_ - bytes_transfered;
00222            it != in_.begin() + in_buffer_size_; ++it)
00223         oss << boost::format("%02x") % static_cast<unsigned int>(*it) << " ";
00224       ROS_DEBUG("U-Blox received %li bytes \n%s", bytes_transfered, 
00225                oss.str().c_str());
00226     }
00227 
00228     if (read_callback_)
00229       read_callback_(in_.data(), in_buffer_size_);
00230 
00231     read_condition_.notify_all();
00232   }
00233 
00234   if (!stopping_)
00235     io_service_->post(boost::bind(&AsyncWorker<StreamT>::doRead, this));
00236 }
00237 
00238 template <typename StreamT>
00239 void AsyncWorker<StreamT>::doClose() {
00240   ScopedLock lock(read_mutex_);
00241   stopping_ = true;
00242   boost::system::error_code error;
00243   stream_->cancel(error);
00244 }
00245 
00246 template <typename StreamT>
00247 void AsyncWorker<StreamT>::wait(
00248     const boost::posix_time::time_duration& timeout) {
00249   ScopedLock lock(read_mutex_);
00250   read_condition_.timed_wait(lock, timeout);
00251 }
00252 
00253 }  // namespace ublox_gps
00254 
00255 #endif  // UBLOX_GPS_ASYNC_WORKER_H


ublox_gps
Author(s): Johannes Meyer
autogenerated on Fri Aug 11 2017 02:31:06