async_worker.h
Go to the documentation of this file.
00001 //==============================================================================
00002 // Copyright (c) 2012, Johannes Meyer, TU Darmstadt
00003 // All rights reserved.
00004 
00005 // Redistribution and use in source and binary forms, with or without
00006 // modification, are permitted provided that the following conditions are met:
00007 //     * Redistributions of source code must retain the above copyright
00008 //       notice, this list of conditions and the following disclaimer.
00009 //     * Redistributions in binary form must reproduce the above copyright
00010 //       notice, this list of conditions and the following disclaimer in the
00011 //       documentation and/or other materials provided with the distribution.
00012 //     * Neither the name of the Flight Systems and Automatic Control group,
00013 //       TU Darmstadt, nor the names of its contributors may be used to
00014 //       endorse or promote products derived from this software without
00015 //       specific prior written permission.
00016 
00017 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY
00021 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
00022 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00023 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
00024 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00025 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00026 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00027 //==============================================================================
00028 
00029 #ifndef UBLOX_GPS_ASYNC_WORKER_H
00030 #define UBLOX_GPS_ASYNC_WORKER_H
00031 
00032 #include <ublox_gps/gps.h>
00033 
00034 #include <boost/asio.hpp>
00035 #include <boost/bind.hpp>
00036 #include <boost/format.hpp>
00037 #include <boost/thread.hpp>
00038 #include <boost/thread/condition.hpp>
00039 
00040 
00041 #include "worker.h"
00042 
00043 namespace ublox_gps {
00044 
00045 int debug; 
00046 
00050 template <typename StreamT>
00051 class AsyncWorker : public Worker {
00052  public:
00053   typedef boost::mutex Mutex;
00054   typedef boost::mutex::scoped_lock ScopedLock;
00055 
00062   AsyncWorker(boost::shared_ptr<StreamT> stream,
00063               boost::shared_ptr<boost::asio::io_service> io_service,
00064               std::size_t buffer_size = 8192);
00065   virtual ~AsyncWorker();
00066 
00071   void setCallback(const Callback& callback) { read_callback_ = callback; }
00072 
00077   void setRawDataCallback(const Callback& callback) { write_callback_ = callback; }
00078 
00084   bool send(const unsigned char* data, const unsigned int size);
00089   void wait(const boost::posix_time::time_duration& timeout);
00090 
00091   bool isOpen() const { return stream_->is_open(); }
00092 
00093  protected:
00097   void doRead();
00098 
00104   void readEnd(const boost::system::error_code&, std::size_t);
00105 
00109   void doWrite();
00110 
00114   void doClose();
00115 
00116   boost::shared_ptr<StreamT> stream_; 
00117   boost::shared_ptr<boost::asio::io_service> io_service_; 
00118 
00119   Mutex read_mutex_; 
00120   boost::condition read_condition_;
00121   std::vector<unsigned char> in_; 
00122   std::size_t in_buffer_size_; 
00123 
00124 
00125   Mutex write_mutex_; 
00126   boost::condition write_condition_;
00127   std::vector<unsigned char> out_; 
00128 
00129   boost::shared_ptr<boost::thread> background_thread_; 
00130 
00131   Callback read_callback_; 
00132   Callback write_callback_; 
00133 
00134   bool stopping_; 
00135 };
00136 
00137 template <typename StreamT>
00138 AsyncWorker<StreamT>::AsyncWorker(boost::shared_ptr<StreamT> stream,
00139         boost::shared_ptr<boost::asio::io_service> io_service,
00140         std::size_t buffer_size)
00141     : stopping_(false) {
00142   stream_ = stream;
00143   io_service_ = io_service;
00144   in_.resize(buffer_size);
00145   in_buffer_size_ = 0;
00146 
00147   out_.reserve(buffer_size);
00148 
00149   io_service_->post(boost::bind(&AsyncWorker<StreamT>::doRead, this));
00150   background_thread_.reset(new boost::thread(
00151       boost::bind(&boost::asio::io_service::run, io_service_)));
00152 }
00153 
00154 template <typename StreamT>
00155 AsyncWorker<StreamT>::~AsyncWorker() {
00156   io_service_->post(boost::bind(&AsyncWorker<StreamT>::doClose, this));
00157   background_thread_->join();
00158   //io_service_->reset();
00159 }
00160 
00161 template <typename StreamT>
00162 bool AsyncWorker<StreamT>::send(const unsigned char* data,
00163                                 const unsigned int size) {
00164   ScopedLock lock(write_mutex_);
00165   if(size == 0) {
00166     ROS_ERROR("Ublox AsyncWorker::send: Size of message to send is 0");
00167     return true;
00168   }
00169 
00170   if (out_.capacity() - out_.size() < size) {
00171     ROS_ERROR("Ublox AsyncWorker::send: Output buffer too full to send message");
00172     return false;
00173   }
00174   out_.insert(out_.end(), data, data + size);
00175 
00176   io_service_->post(boost::bind(&AsyncWorker<StreamT>::doWrite, this));
00177   return true;
00178 }
00179 
00180 template <typename StreamT>
00181 void AsyncWorker<StreamT>::doWrite() {
00182   ScopedLock lock(write_mutex_);
00183   // Do nothing if out buffer is empty
00184   if (out_.size() == 0) {
00185     return;
00186   }
00187   // Write all the data in the out buffer
00188   boost::asio::write(*stream_, boost::asio::buffer(out_.data(), out_.size()));
00189 
00190   if (debug >= 2) {
00191     // Print the data that was sent
00192     std::ostringstream oss;
00193     for (std::vector<unsigned char>::iterator it = out_.begin();
00194          it != out_.end(); ++it)
00195       oss << boost::format("%02x") % static_cast<unsigned int>(*it) << " ";
00196     ROS_DEBUG("U-Blox sent %li bytes: \n%s", out_.size(), oss.str().c_str());
00197   }
00198   // Clear the buffer & unlock
00199   out_.clear();
00200   write_condition_.notify_all();
00201 }
00202 
00203 template <typename StreamT>
00204 void AsyncWorker<StreamT>::doRead() {
00205   ScopedLock lock(read_mutex_);
00206   stream_->async_read_some(
00207       boost::asio::buffer(in_.data() + in_buffer_size_,
00208                           in_.size() - in_buffer_size_),
00209                           boost::bind(&AsyncWorker<StreamT>::readEnd, this,
00210                               boost::asio::placeholders::error,
00211                               boost::asio::placeholders::bytes_transferred));
00212 }
00213 
00214 template <typename StreamT>
00215 void AsyncWorker<StreamT>::readEnd(const boost::system::error_code& error,
00216                                    std::size_t bytes_transfered) {
00217   ScopedLock lock(read_mutex_);
00218   if (error) {
00219     ROS_ERROR("U-Blox ASIO input buffer read error: %s, %li",
00220               error.message().c_str(),
00221               bytes_transfered);
00222   } else if (bytes_transfered > 0) {
00223     in_buffer_size_ += bytes_transfered;
00224 
00225     unsigned char *pRawDataStart = &(*(in_.begin() + (in_buffer_size_ - bytes_transfered)));
00226     std::size_t raw_data_stream_size = bytes_transfered;
00227 
00228     if (write_callback_)
00229       write_callback_(pRawDataStart, raw_data_stream_size);
00230 
00231     if (debug >= 4) {
00232       std::ostringstream oss;
00233       for (std::vector<unsigned char>::iterator it =
00234                in_.begin() + in_buffer_size_ - bytes_transfered;
00235            it != in_.begin() + in_buffer_size_; ++it)
00236         oss << boost::format("%02x") % static_cast<unsigned int>(*it) << " ";
00237       ROS_DEBUG("U-Blox received %li bytes \n%s", bytes_transfered,
00238                oss.str().c_str());
00239     }
00240 
00241     if (read_callback_)
00242       read_callback_(in_.data(), in_buffer_size_);
00243 
00244     read_condition_.notify_all();
00245   }
00246 
00247   if (!stopping_)
00248     io_service_->post(boost::bind(&AsyncWorker<StreamT>::doRead, this));
00249 }
00250 
00251 template <typename StreamT>
00252 void AsyncWorker<StreamT>::doClose() {
00253   ScopedLock lock(read_mutex_);
00254   stopping_ = true;
00255   boost::system::error_code error;
00256   stream_->close(error);
00257   if(error)
00258     ROS_ERROR_STREAM(
00259         "Error while closing the AsyncWorker stream: " << error.message());
00260 }
00261 
00262 template <typename StreamT>
00263 void AsyncWorker<StreamT>::wait(
00264     const boost::posix_time::time_duration& timeout) {
00265   ScopedLock lock(read_mutex_);
00266   read_condition_.timed_wait(lock, timeout);
00267 }
00268 
00269 }  // namespace ublox_gps
00270 
00271 #endif  // UBLOX_GPS_ASYNC_WORKER_H


ublox_gps
Author(s): Johannes Meyer
autogenerated on Fri Jun 14 2019 19:26:13