Program Listing for File async_manager.hpp

Return to documentation for file (/tmp/ws/src/septentrio_gnss_driver/include/septentrio_gnss_driver/communication/async_manager.hpp)

// *****************************************************************************
//
// © Copyright 2020, Septentrio NV/SA.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//    1. Redistributions of source code must retain the above copyright
//       notice, this list of conditions and the following disclaimer.
//    2. Redistributions in binary form must reproduce the above copyright
//       notice, this list of conditions and the following disclaimer in the
//       documentation and/or other materials provided with the distribution.
//    3. Neither the name of the copyright holder nor the names of its
//       contributors may be used to endorse or promote products derived
//       from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
//
// *****************************************************************************

// *****************************************************************************
//
// Boost Software License - Version 1.0 - August 17th, 2003
//
// Permission is hereby granted, free of charge, to any person or organization
// obtaining a copy of the software and accompanying documentation covered by
// this license (the "Software") to use, reproduce, display, distribute,
// execute, and transmit the Software, and to prepare derivative works of the
// Software, and to permit third-parties to whom the Software is furnished to
// do so, all subject to the following:

// The copyright notices in the Software and this entire statement, including
// the above license grant, this restriction and the following disclaimer,
// must be included in all copies of the Software, in whole or in part, and
// all derivative works of the Software, unless such copies or derivative
// works are solely in the form of machine-executable object code generated by
// a source language processor.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
// SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
// FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
// ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//
// *****************************************************************************

#pragma once

// Boost includes
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/regex.hpp>

// ROSaic includes
#include <septentrio_gnss_driver/crc/crc.hpp>
#include <septentrio_gnss_driver/parsers/parsing_utilities.hpp>

// local includes
#include <septentrio_gnss_driver/communication/io.hpp>
#include <septentrio_gnss_driver/communication/telegram.hpp>

namespace io {

    class AsyncManagerBase
    {
    public:
        virtual ~AsyncManagerBase() {}
        [[nodiscard]] virtual bool connect() = 0;
        virtual void send(const std::string& cmd) = 0;
    };

    template <typename IoType>
    class AsyncManager : public AsyncManagerBase
    {
    public:
        AsyncManager(ROSaicNodeBase* node, TelegramQueue* telegramQueue);

        ~AsyncManager();

        [[nodiscard]] bool connect();

        void setPort(const std::string& port);

        void send(const std::string& cmd);

    private:
        void receive();
        void close();
        void runIoService();
        void runWatchdog();
        void write(const std::string& cmd);
        void resync();
        template <uint8_t index>
        void readSync();
        void readSbfHeader();
        void readSbf(std::size_t length);
        void readUnknown();
        void readString();
        void readStringElements();

        ROSaicNodeBase* node_;
        std::shared_ptr<boost::asio::io_service> ioService_;
        IoType ioInterface_;
        std::atomic<bool> running_;
        std::thread ioThread_;
        std::thread watchdogThread_;

        std::array<uint8_t, 1> buf_;
        Timestamp recvStamp_;
        std::shared_ptr<Telegram> telegram_;
        TelegramQueue* telegramQueue_;
    };

    template <typename IoType>
    AsyncManager<IoType>::AsyncManager(ROSaicNodeBase* node,
                                       TelegramQueue* telegramQueue) :
        node_(node),
        ioService_(new boost::asio::io_service), ioInterface_(node, ioService_),
        telegramQueue_(telegramQueue)
    {
        node_->log(log_level::DEBUG, "AsyncManager created.");
    }

    template <typename IoType>
    AsyncManager<IoType>::~AsyncManager()
    {
        running_ = false;
        close();
        node_->log(log_level::DEBUG, "AsyncManager shutting down threads");
        ioService_->stop();
        ioThread_.join();
        watchdogThread_.join();
        node_->log(log_level::DEBUG, "AsyncManager threads stopped");
    }

    template <typename IoType>
    [[nodiscard]] bool AsyncManager<IoType>::connect()
    {
        running_ = true;

        if (!ioInterface_.connect())
        {
            return false;
        }
        receive();

        return true;
    }

    template <typename IoType>
    void AsyncManager<IoType>::setPort(const std::string& port)
    {
        ioInterface_.setPort(port);
    }

    template <typename IoType>
    void AsyncManager<IoType>::send(const std::string& cmd)
    {
        if (cmd.size() == 0)
        {
            node_->log(log_level::ERROR,
                       "AsyncManager message size to be sent to the Rx would be 0");
            return;
        }

        ioService_->post(boost::bind(&AsyncManager<IoType>::write, this, cmd));
    }

    template <typename IoType>
    void AsyncManager<IoType>::receive()
    {
        resync();
        ioThread_ =
            std::thread(std::bind(&AsyncManager<IoType>::runIoService, this));
        if (!watchdogThread_.joinable())
            watchdogThread_ =
                std::thread(std::bind(&AsyncManager::runWatchdog, this));
    }

    template <typename IoType>
    void AsyncManager<IoType>::close()
    {
        ioService_->post([this]() { ioInterface_.close(); });
    }

    template <typename IoType>
    void AsyncManager<IoType>::runIoService()
    {
        ioService_->run();
        node_->log(log_level::DEBUG, "AsyncManager ioService terminated.");
    }

    template <typename IoType>
    void AsyncManager<IoType>::runWatchdog()
    {
        while (running_)
        {
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
            if (running_ && ioService_->stopped())
            {
                if (node_->settings()->read_from_sbf_log ||
                    node_->settings()->read_from_pcap)
                {
                    node_->log(
                        log_level::INFO,
                        "AsyncManager finished reading file. Node will continue to publish queued messages.");
                    break;
                } else
                {
                    node_->log(log_level::ERROR,
                               "AsyncManager connection lost. Trying to reconnect.");
                    ioService_->reset();
                    ioThread_.join();
                    while (!ioInterface_.connect())
                        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
                    receive();
                }
            } else if (running_ && std::is_same<TcpIo, IoType>::value)
            {
                // Send to check if TCP connection still alive
                std::string empty = " ";
                boost::asio::async_write(
                    *(ioInterface_.stream_), boost::asio::buffer(empty.data(), 1),
                    [](boost::system::error_code ec, std::size_t /*length*/) {});
            }
        }
    }

    template <typename IoType>
    void AsyncManager<IoType>::write(const std::string& cmd)
    {
        boost::asio::async_write(
            *(ioInterface_.stream_), boost::asio::buffer(cmd.data(), cmd.size()),
            [this, cmd](boost::system::error_code ec, std::size_t /*length*/) {
                if (!ec)
                {
                    // Prints the data that was sent
                    node_->log(log_level::DEBUG, "AsyncManager sent the following " +
                                                     std::to_string(cmd.size()) +
                                                     " bytes to the Rx: " + cmd);
                } else
                {
                    node_->log(log_level::ERROR,
                               "AsyncManager was unable to send the following " +
                                   std::to_string(cmd.size()) +
                                   " bytes to the Rx: " + cmd);
                }
            });
    }

    template <typename IoType>
    void AsyncManager<IoType>::resync()
    {
        telegram_.reset(new Telegram);
        readSync<0>();
    }

    template <typename IoType>
    template <uint8_t index>
    void AsyncManager<IoType>::readSync()
    {
        static_assert(index < 3);

        boost::asio::async_read(
            *(ioInterface_.stream_),
            boost::asio::buffer(telegram_->message.data() + index, 1),
            [this](boost::system::error_code ec, std::size_t numBytes) {
                Timestamp stamp = node_->getTime();

                if (!ec)
                {
                    if (numBytes == 1)
                    {
                        uint8_t& currByte = telegram_->message[index];

                        if (currByte == SYNC_BYTE_1)
                        {
                            telegram_->stamp = stamp;
                            readSync<1>();
                        } else
                        {
                            switch (index)
                            {
                            case 0:
                            {
                                telegram_->type = telegram_type::UNKNOWN;
                                readUnknown();
                                break;
                            }
                            case 1:
                            {
                                switch (currByte)
                                {
                                case SBF_SYNC_BYTE_2:
                                {
                                    telegram_->type = telegram_type::SBF;
                                    readSbfHeader();
                                    break;
                                }
                                case NMEA_SYNC_BYTE_2:
                                {
                                    telegram_->type = telegram_type::NMEA;
                                    readSync<2>();
                                    break;
                                }
                                case NMEA_INS_SYNC_BYTE_2:
                                {
                                    telegram_->type = telegram_type::NMEA_INS;
                                    readSync<2>();
                                    break;
                                }
                                case RESPONSE_SYNC_BYTE_2:
                                {
                                    telegram_->type = telegram_type::RESPONSE;
                                    readSync<2>();
                                    break;
                                }
                                default:
                                {
                                    std::stringstream ss;
                                    ss << std::hex << currByte;
                                    node_->log(
                                        log_level::DEBUG,
                                        "AsyncManager sync byte 2 read fault, should never come here.. Received byte was " +
                                            ss.str());
                                    resync();
                                    break;
                                }
                                }
                                break;
                            }
                            case 2:
                            {
                                switch (currByte)
                                {
                                case NMEA_SYNC_BYTE_3:
                                {
                                    if (telegram_->type == telegram_type::NMEA)
                                        readString();
                                    else
                                        resync();
                                    break;
                                }
                                case NMEA_INS_SYNC_BYTE_3:
                                {
                                    if (telegram_->type == telegram_type::NMEA_INS)
                                        readString();
                                    else
                                        resync();
                                    break;
                                }
                                case RESPONSE_SYNC_BYTE_3:
                                {
                                    if (telegram_->type == telegram_type::RESPONSE)
                                        readString();
                                    else
                                        resync();
                                    break;
                                }
                                case RESPONSE_SYNC_BYTE_3a:
                                {
                                    if (telegram_->type == telegram_type::RESPONSE)
                                        readString();
                                    else
                                        resync();
                                    break;
                                }
                                case ERROR_SYNC_BYTE_3:
                                {
                                    if (telegram_->type == telegram_type::RESPONSE)
                                    {
                                        telegram_->type =
                                            telegram_type::ERROR_RESPONSE;
                                        readString();
                                    } else
                                        resync();
                                    break;
                                }
                                default:
                                {
                                    std::stringstream ss;
                                    ss << std::hex << currByte;
                                    node_->log(
                                        log_level::DEBUG,
                                        "AsyncManager sync byte 3 read fault, should never come here. Received byte was " +
                                            ss.str());
                                    resync();
                                    break;
                                }
                                }
                                break;
                            }
                            default:
                            {
                                node_->log(
                                    log_level::DEBUG,
                                    "AsyncManager sync read fault, should never come here.");
                                resync();
                                break;
                            }
                            }
                        }
                    } else
                    {
                        node_->log(
                            log_level::DEBUG,
                            "AsyncManager sync read fault, wrong number of bytes read: " +
                                std::to_string(numBytes));
                        resync();
                    }
                } else
                {
                    node_->log(log_level::DEBUG,
                               "AsyncManager sync read error: " + ec.message());
                }
            });
    }

    template <typename IoType>
    void AsyncManager<IoType>::readSbfHeader()
    {
        telegram_->message.resize(SBF_HEADER_SIZE);

        boost::asio::async_read(
            *(ioInterface_.stream_),
            boost::asio::buffer(telegram_->message.data() + 2, SBF_HEADER_SIZE - 2),
            [this](boost::system::error_code ec, std::size_t numBytes) {
                if (!ec)
                {
                    if (numBytes == (SBF_HEADER_SIZE - 2))
                    {
                        uint16_t length =
                            parsing_utilities::getLength(telegram_->message);
                        if (length > MAX_SBF_SIZE)
                        {
                            node_->log(
                                log_level::DEBUG,
                                "AsyncManager SBF header read fault, length of block exceeds " +
                                    std::to_string(MAX_SBF_SIZE) + ": " +
                                    std::to_string(length));
                        } else
                            readSbf(length);
                    } else
                    {
                        node_->log(
                            log_level::DEBUG,
                            "AsyncManager SBF header read fault, wrong number of bytes read: " +
                                std::to_string(numBytes));
                        resync();
                    }
                } else
                {
                    node_->log(log_level::DEBUG,
                               "AsyncManager SBF header read error: " +
                                   ec.message());
                }
            });
    }

    template <typename IoType>
    void AsyncManager<IoType>::readSbf(std::size_t length)
    {
        telegram_->message.resize(length);

        boost::asio::async_read(
            *(ioInterface_.stream_),
            boost::asio::buffer(telegram_->message.data() + SBF_HEADER_SIZE,
                                length - SBF_HEADER_SIZE),
            [this, length](boost::system::error_code ec, std::size_t numBytes) {
                if (!ec)
                {
                    if (numBytes == (length - SBF_HEADER_SIZE))
                    {
                        if (crc::isValid(telegram_->message))
                        {
                            telegramQueue_->push(telegram_);
                        } else
                            node_->log(log_level::DEBUG,
                                       "AsyncManager crc failed for SBF  " +
                                           std::to_string(parsing_utilities::getId(
                                               telegram_->message)) +
                                           ".");
                    } else
                    {
                        node_->log(
                            log_level::DEBUG,
                            "AsyncManager SBF read fault, wrong number of bytes read: " +
                                std::to_string(numBytes));
                    }
                    resync();
                } else
                {
                    node_->log(log_level::DEBUG,
                               "AsyncManager SBF read error: " + ec.message());
                }
            });
    }

    template <typename IoType>
    void AsyncManager<IoType>::readUnknown()
    {
        telegram_->message.resize(1);
        telegram_->message.reserve(256);
        readStringElements();
    }

    template <typename IoType>
    void AsyncManager<IoType>::readString()
    {
        telegram_->message.resize(3);
        telegram_->message.reserve(256);
        readStringElements();
    }

    template <typename IoType>
    void AsyncManager<IoType>::readStringElements()
    {
        boost::asio::async_read(
            *(ioInterface_.stream_), boost::asio::buffer(buf_.data(), 1),
            [this](boost::system::error_code ec, std::size_t numBytes) {
                if (!ec)
                {
                    if (numBytes == 1)
                    {
                        telegram_->message.push_back(buf_[0]);
                        /*node_->log(log_level::DEBUG,
                                   "Buffer: " +
                                       std::string(telegram_->message.begin(),
                                                   telegram_->message.end()));*/

                        switch (buf_[0])
                        {
                        case SYNC_BYTE_1:
                        {
                            telegram_.reset(new Telegram);
                            telegram_->message[0] = buf_[0];
                            telegram_->stamp = node_->getTime();
                            node_->log(
                                log_level::DEBUG,
                                "AsyncManager string read fault, sync 1 found.");
                            readSync<1>();
                            break;
                        }
                        case LF:
                        {
                            if (telegram_->message[telegram_->message.size() - 2] ==
                                CR)
                                telegramQueue_->push(telegram_);
                            else
                                node_->log(
                                    log_level::DEBUG,
                                    "LF wo CR: " +
                                        std::string(telegram_->message.begin(),
                                                    telegram_->message.end()));
                            resync();
                            break;
                        }
                        case CONNECTION_DESCRIPTOR_FOOTER:
                        {
                            telegram_->type = telegram_type::CONNECTION_DESCRIPTOR;
                            telegramQueue_->push(telegram_);
                            resync();
                            break;
                        }
                        default:
                        {
                            readStringElements();
                            break;
                        }
                        }
                    } else
                    {
                        node_->log(
                            log_level::DEBUG,
                            "AsyncManager string read fault, wrong number of bytes read: " +
                                std::to_string(numBytes));
                        resync();
                    }
                } else
                {
                    node_->log(log_level::DEBUG,
                               "AsyncManager string read error: " + ec.message());
                }
            });
    }
} // namespace io