.. _program_listing_file__tmp_ws_src_ublox_ublox_gps_include_ublox_gps_async_worker.hpp: Program Listing for File async_worker.hpp ========================================= |exhale_lsh| :ref:`Return to documentation for file ` (``/tmp/ws/src/ublox/ublox_gps/include/ublox_gps/async_worker.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp //============================================================================== // Copyright (c) 2012, Johannes Meyer, TU Darmstadt // All rights reserved. // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // * Neither the name of the Flight Systems and Automatic Control group, // TU Darmstadt, nor the names of its contributors may be used to // endorse or promote products derived from this software without // specific prior written permission. // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. //============================================================================== #ifndef UBLOX_GPS_ASYNC_WORKER_HPP #define UBLOX_GPS_ASYNC_WORKER_HPP #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "worker.hpp" namespace ublox_gps { template class AsyncWorker final : public Worker { public: explicit AsyncWorker(std::shared_ptr stream, std::shared_ptr io_service, std::size_t buffer_size, int debug, const rclcpp::Logger& logger); ~AsyncWorker() override; AsyncWorker(AsyncWorker &&c) = delete; AsyncWorker &operator=(AsyncWorker &&c) = delete; AsyncWorker(const AsyncWorker &c) = delete; AsyncWorker &operator=(const AsyncWorker &c) = delete; void setCallback(const WorkerCallback& callback) override { read_callback_ = callback; } void setRawDataCallback(const WorkerRawCallback& callback) override { raw_callback_ = callback; } bool send(const unsigned char* data, const unsigned int size) override; void wait(const std::chrono::milliseconds& timeout) override; bool isOpen() const override { return stream_->is_open(); } private: void doRead(); void readEnd(const asio::error_code& error, std::size_t bytes_transferred); void doWrite(); void doClose(); std::shared_ptr stream_; std::shared_ptr io_service_; std::mutex read_mutex_; std::condition_variable read_condition_; std::vector in_; std::size_t in_buffer_size_; std::mutex write_mutex_; std::condition_variable write_condition_; std::vector out_; std::shared_ptr background_thread_; WorkerCallback read_callback_; WorkerRawCallback raw_callback_; bool stopping_; int debug_; rclcpp::Logger logger_; }; template AsyncWorker::AsyncWorker(std::shared_ptr stream, std::shared_ptr io_service, std::size_t buffer_size, int debug, const rclcpp::Logger& logger) : stream_(stream), io_service_(io_service), in_buffer_size_(0), stopping_(false), debug_(debug), logger_(logger) { in_.resize(buffer_size); out_.reserve(buffer_size); io_service_->post(std::bind(&AsyncWorker::doRead, this)); background_thread_ = std::make_shared([this]{ io_service_->run(); }); } template AsyncWorker::~AsyncWorker() { io_service_->post(std::bind(&AsyncWorker::doClose, this)); background_thread_->join(); //io_service_->reset(); } template bool AsyncWorker::send(const unsigned char* data, const unsigned int size) { std::lock_guard lock(write_mutex_); if (size == 0) { RCLCPP_ERROR(logger_, "Ublox AsyncWorker::send: Size of message to send is 0"); return true; } if (out_.capacity() - out_.size() < size) { RCLCPP_ERROR(logger_, "Ublox AsyncWorker::send: Output buffer too full to send message"); return false; } out_.insert(out_.end(), data, data + size); io_service_->post(std::bind(&AsyncWorker::doWrite, this)); return true; } template void AsyncWorker::doWrite() { std::lock_guard lock(write_mutex_); // Do nothing if out buffer is empty if (out_.size() == 0) { return; } // Write all the data in the out buffer asio::write(*stream_, asio::buffer(out_.data(), out_.size())); if (debug_ >= 2) { // Print the data that was sent std::ostringstream oss; for (std::vector::iterator it = out_.begin(); it != out_.end(); ++it) { oss << std::hex << static_cast(*it) << " "; } RCLCPP_DEBUG(logger_, "U-Blox sent %li bytes: \n%s", out_.size(), oss.str().c_str()); } // Clear the buffer & unlock out_.clear(); write_condition_.notify_all(); } template <> inline void AsyncWorker::doWrite() { std::lock_guard lock(write_mutex_); // Do nothing if out buffer is empty if (out_.size() == 0) { return; } // Write all the data in the out buffer stream_->send(asio::buffer(out_.data(), out_.size())); if (debug_ >= 2) { // Print the data that was sent std::ostringstream oss; for (std::vector::iterator it = out_.begin(); it != out_.end(); ++it) { oss << std::hex << static_cast(*it) << " "; } RCLCPP_DEBUG(logger_, "U-Blox sent %li bytes: \n%s", out_.size(), oss.str().c_str()); } // Clear the buffer & unlock out_.clear(); write_condition_.notify_all(); } template void AsyncWorker::doRead() { std::lock_guard lock(read_mutex_); if (in_.size() - in_buffer_size_ == 0) { // In some circumstances, it is possible that there is no room left in the // buffer. This can happen, for instance, if one of the UBlox messages // has a value in the Length field that is much larger than this buffer // can accomodate. We definitely don't want to ask for a 0-byte read (as // we will get into an endless loop of asking for, and then receiving, // 0 bytes), so we just throw away all of the data in the buffer. in_buffer_size_ = 0; } stream_->async_read_some( asio::buffer(in_.data() + in_buffer_size_, in_.size() - in_buffer_size_), std::bind(&AsyncWorker::readEnd, this, std::placeholders::_1, std::placeholders::_2)); } template <> inline void AsyncWorker::doRead() { std::lock_guard lock(read_mutex_); if (in_.size() - in_buffer_size_ == 0) { // In some circumstances, it is possible that there is no room left in the // buffer. This can happen, for instance, if one of the UBlox messages // has a value in the Length field that is much larger than this buffer // can accomodate. We definitely don't want to ask for a 0-byte read (as // we will get into an endless loop of asking for, and then receiving, // 0 bytes), so we just throw away all of the data in the buffer. in_buffer_size_ = 0; } stream_->async_receive( asio::buffer(in_.data() + in_buffer_size_, in_.size() - in_buffer_size_), std::bind(&AsyncWorker::readEnd, this, std::placeholders::_1, std::placeholders::_2)); } template void AsyncWorker::readEnd(const asio::error_code& error, std::size_t bytes_transferred) { std::lock_guard lock(read_mutex_); if (error) { RCLCPP_ERROR(logger_, "U-Blox ASIO input buffer read error: %s, %li", error.message().c_str(), bytes_transferred); } else if (bytes_transferred > 0) { in_buffer_size_ += bytes_transferred; unsigned char *pRawDataStart = &(*(in_.begin() + (in_buffer_size_ - bytes_transferred))); std::size_t raw_data_stream_size = bytes_transferred; if (raw_callback_) { raw_callback_(pRawDataStart, raw_data_stream_size); } if (debug_ >= 4) { std::ostringstream oss; for (std::vector::iterator it = in_.begin() + in_buffer_size_ - bytes_transferred; it != in_.begin() + in_buffer_size_; ++it) { oss << std::hex << static_cast(*it) << " "; } RCLCPP_DEBUG(logger_, "U-Blox received %li bytes \n%s", bytes_transferred, oss.str().c_str()); } if (read_callback_) { in_buffer_size_ -= read_callback_(in_.data(), in_buffer_size_); } read_condition_.notify_all(); } else { RCLCPP_ERROR(logger_, "U-Blox ASIO transferred zero bytes"); } if (!stopping_) { io_service_->post(std::bind(&AsyncWorker::doRead, this)); } } template void AsyncWorker::doClose() { std::lock_guard lock(read_mutex_); stopping_ = true; asio::error_code error; stream_->close(error); if (error) { RCLCPP_ERROR(logger_, "Error while closing the AsyncWorker stream: %s", error.message().c_str()); } } template void AsyncWorker::wait( const std::chrono::milliseconds& timeout) { std::unique_lock lock(read_mutex_); read_condition_.wait_for(lock, timeout); } } // namespace ublox_gps #endif // UBLOX_GPS_ASYNC_WORKER_HPP