Program Listing for File async_worker.hpp

Return to documentation for file (/tmp/ws/src/ublox/ublox_gps/include/ublox_gps/async_worker.hpp)

//==============================================================================
// Copyright (c) 2012, Johannes Meyer, TU Darmstadt
// All rights reserved.

// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//     * Redistributions of source code must retain the above copyright
//       notice, this list of conditions and the following disclaimer.
//     * Redistributions in binary form must reproduce the above copyright
//       notice, this list of conditions and the following disclaimer in the
//       documentation and/or other materials provided with the distribution.
//     * Neither the name of the Flight Systems and Automatic Control group,
//       TU Darmstadt, nor the names of its contributors may be used to
//       endorse or promote products derived from this software without
//       specific prior written permission.

// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY
// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//==============================================================================

#ifndef UBLOX_GPS_ASYNC_WORKER_HPP
#define UBLOX_GPS_ASYNC_WORKER_HPP

#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>

#include <asio/buffer.hpp>
#include <asio/error_code.hpp>
#include <asio/io_service.hpp>
#include <asio/placeholders.hpp>
#include <asio/write.hpp>
#include <asio/ip/udp.hpp>

#include <rclcpp/rclcpp.hpp>

#include "worker.hpp"

namespace ublox_gps {

template <typename StreamT>
class AsyncWorker final : public Worker {
 public:
  explicit AsyncWorker(std::shared_ptr<StreamT> stream,
                       std::shared_ptr<asio::io_service> io_service,
                       std::size_t buffer_size,
                       int debug,
                       const rclcpp::Logger& logger);
  ~AsyncWorker() override;

  AsyncWorker(AsyncWorker &&c) = delete;
  AsyncWorker &operator=(AsyncWorker &&c) = delete;
  AsyncWorker(const AsyncWorker &c) = delete;
  AsyncWorker &operator=(const AsyncWorker &c) = delete;

  void setCallback(const WorkerCallback& callback) override { read_callback_ = callback; }

  void setRawDataCallback(const WorkerRawCallback& callback) override { raw_callback_ = callback; }

  bool send(const unsigned char* data, const unsigned int size) override;
  void wait(const std::chrono::milliseconds& timeout) override;

  bool isOpen() const override { return stream_->is_open(); }

 private:
  void doRead();

  void readEnd(const asio::error_code& error, std::size_t bytes_transferred);

  void doWrite();

  void doClose();

  std::shared_ptr<StreamT> stream_;
  std::shared_ptr<asio::io_service> io_service_;

  std::mutex read_mutex_;
  std::condition_variable read_condition_;
  std::vector<unsigned char> in_;
  std::size_t in_buffer_size_;

  std::mutex write_mutex_;
  std::condition_variable write_condition_;
  std::vector<unsigned char> out_;

  std::shared_ptr<std::thread> background_thread_;
  WorkerCallback read_callback_;
  WorkerRawCallback raw_callback_;

  bool stopping_;

  int debug_;

  rclcpp::Logger logger_;
};

template <typename StreamT>
AsyncWorker<StreamT>::AsyncWorker(std::shared_ptr<StreamT> stream,
        std::shared_ptr<asio::io_service> io_service,
        std::size_t buffer_size,
        int debug,
        const rclcpp::Logger& logger)
    : stream_(stream), io_service_(io_service), in_buffer_size_(0), stopping_(false), debug_(debug), logger_(logger) {
  in_.resize(buffer_size);

  out_.reserve(buffer_size);

  io_service_->post(std::bind(&AsyncWorker<StreamT>::doRead, this));
  background_thread_ = std::make_shared<std::thread>([this]{ io_service_->run(); });
}

template <typename StreamT>
AsyncWorker<StreamT>::~AsyncWorker() {
  io_service_->post(std::bind(&AsyncWorker<StreamT>::doClose, this));
  background_thread_->join();
  //io_service_->reset();
}

template <typename StreamT>
bool AsyncWorker<StreamT>::send(const unsigned char* data,
                                const unsigned int size) {
  std::lock_guard<std::mutex> lock(write_mutex_);
  if (size == 0) {
    RCLCPP_ERROR(logger_, "Ublox AsyncWorker::send: Size of message to send is 0");
    return true;
  }

  if (out_.capacity() - out_.size() < size) {
    RCLCPP_ERROR(logger_, "Ublox AsyncWorker::send: Output buffer too full to send message");
    return false;
  }
  out_.insert(out_.end(), data, data + size);

  io_service_->post(std::bind(&AsyncWorker<StreamT>::doWrite, this));
  return true;
}

template <typename StreamT>
void AsyncWorker<StreamT>::doWrite() {
  std::lock_guard<std::mutex> lock(write_mutex_);
  // Do nothing if out buffer is empty
  if (out_.size() == 0) {
    return;
  }
  // Write all the data in the out buffer
  asio::write(*stream_, asio::buffer(out_.data(), out_.size()));

  if (debug_ >= 2) {
    // Print the data that was sent
    std::ostringstream oss;
    for (std::vector<unsigned char>::iterator it = out_.begin();
         it != out_.end(); ++it) {
      oss << std::hex << static_cast<unsigned int>(*it) << " ";
    }
    RCLCPP_DEBUG(logger_, "U-Blox sent %li bytes: \n%s", out_.size(), oss.str().c_str());
  }
  // Clear the buffer & unlock
  out_.clear();
  write_condition_.notify_all();
}
template <>
inline void AsyncWorker<asio::ip::udp::socket>::doWrite() {
  std::lock_guard<std::mutex> lock(write_mutex_);
  // Do nothing if out buffer is empty
  if (out_.size() == 0) {
    return;
  }
  // Write all the data in the out buffer
  stream_->send(asio::buffer(out_.data(), out_.size()));

  if (debug_ >= 2) {
    // Print the data that was sent
    std::ostringstream oss;
    for (std::vector<unsigned char>::iterator it = out_.begin();
         it != out_.end(); ++it) {
      oss << std::hex << static_cast<unsigned int>(*it) << " ";
    }
    RCLCPP_DEBUG(logger_, "U-Blox sent %li bytes: \n%s", out_.size(), oss.str().c_str());
  }
  // Clear the buffer & unlock
  out_.clear();
  write_condition_.notify_all();
}

template <typename StreamT>
void AsyncWorker<StreamT>::doRead() {
  std::lock_guard<std::mutex> lock(read_mutex_);
  if (in_.size() - in_buffer_size_ == 0) {
    // In some circumstances, it is possible that there is no room left in the
    // buffer.  This can happen, for instance, if one of the UBlox messages
    // has a value in the Length field that is much larger than this buffer
    // can accomodate.  We definitely don't want to ask for a 0-byte read (as
    // we will get into an endless loop of asking for, and then receiving,
    // 0 bytes), so we just throw away all of the data in the buffer.
    in_buffer_size_ = 0;
  }

  stream_->async_read_some(
      asio::buffer(in_.data() + in_buffer_size_,
                   in_.size() - in_buffer_size_),
      std::bind(&AsyncWorker<StreamT>::readEnd, this,
                std::placeholders::_1, std::placeholders::_2));
}
template <>
inline void AsyncWorker<asio::ip::udp::socket>::doRead() {
  std::lock_guard<std::mutex> lock(read_mutex_);
  if (in_.size() - in_buffer_size_ == 0) {
    // In some circumstances, it is possible that there is no room left in the
    // buffer.  This can happen, for instance, if one of the UBlox messages
    // has a value in the Length field that is much larger than this buffer
    // can accomodate.  We definitely don't want to ask for a 0-byte read (as
    // we will get into an endless loop of asking for, and then receiving,
    // 0 bytes), so we just throw away all of the data in the buffer.
    in_buffer_size_ = 0;
  }

  stream_->async_receive(
      asio::buffer(in_.data() + in_buffer_size_,
                   in_.size() - in_buffer_size_),
      std::bind(&AsyncWorker<asio::ip::udp::socket>::readEnd, this,
                std::placeholders::_1, std::placeholders::_2));
}

template <typename StreamT>
void AsyncWorker<StreamT>::readEnd(const asio::error_code& error,
                                   std::size_t bytes_transferred) {
  std::lock_guard<std::mutex> lock(read_mutex_);
  if (error) {
    RCLCPP_ERROR(logger_, "U-Blox ASIO input buffer read error: %s, %li",
                 error.message().c_str(),
                 bytes_transferred);
  } else if (bytes_transferred > 0) {
    in_buffer_size_ += bytes_transferred;

    unsigned char *pRawDataStart = &(*(in_.begin() + (in_buffer_size_ - bytes_transferred)));
    std::size_t raw_data_stream_size = bytes_transferred;

    if (raw_callback_) {
      raw_callback_(pRawDataStart, raw_data_stream_size);
    }

    if (debug_ >= 4) {
      std::ostringstream oss;
      for (std::vector<unsigned char>::iterator it =
               in_.begin() + in_buffer_size_ - bytes_transferred;
           it != in_.begin() + in_buffer_size_; ++it) {
        oss << std::hex << static_cast<unsigned int>(*it) << " ";
      }
      RCLCPP_DEBUG(logger_, "U-Blox received %li bytes \n%s", bytes_transferred,
                   oss.str().c_str());
    }

    if (read_callback_) {
      in_buffer_size_ -= read_callback_(in_.data(), in_buffer_size_);
    }

    read_condition_.notify_all();
  } else {
    RCLCPP_ERROR(logger_, "U-Blox ASIO transferred zero bytes");
  }

  if (!stopping_) {
    io_service_->post(std::bind(&AsyncWorker<StreamT>::doRead, this));
  }
}

template <typename StreamT>
void AsyncWorker<StreamT>::doClose() {
  std::lock_guard<std::mutex> lock(read_mutex_);
  stopping_ = true;
  asio::error_code error;
  stream_->close(error);
  if (error) {
    RCLCPP_ERROR(logger_, "Error while closing the AsyncWorker stream: %s",
                 error.message().c_str());
  }
}

template <typename StreamT>
void AsyncWorker<StreamT>::wait(
    const std::chrono::milliseconds& timeout) {
  std::unique_lock<std::mutex> lock(read_mutex_);
  read_condition_.wait_for(lock, timeout);
}

}  // namespace ublox_gps

#endif  // UBLOX_GPS_ASYNC_WORKER_HPP