Program Listing for File stream.h

Return to documentation for file (/tmp/ws/src/ur_client_library/include/ur_client_library/comm/stream.h)

/*
 * Copyright 2019, FZI Forschungszentrum Informatik (templating)
 *
 * Copyright 2017, 2018 Simon Rasmussen (refactor)
 *
 * Copyright 2015, 2016 Thomas Timm Andersen (original version)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once
#include <netdb.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <atomic>
#include <chrono>
#include <mutex>
#include <string>
#include "ur_client_library/log.h"
#include "ur_client_library/comm/tcp_socket.h"

namespace urcl
{
namespace comm
{
template <typename T>
class URStream : public TCPSocket
{
public:
  URStream(const std::string& host, int port) : host_(host), port_(port)
  {
  }

  bool connect(const size_t max_num_tries = 0,
               const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10))
  {
    return TCPSocket::setup(host_, port_, max_num_tries, reconnection_time);
  }

  void disconnect()
  {
    URCL_LOG_DEBUG("Disconnecting from %s:%d", host_.c_str(), port_);
    TCPSocket::close();
  }

  bool closed()
  {
    return getState() == SocketState::Closed;
  }

  bool read(uint8_t* buf, const size_t buf_len, size_t& read);

  bool write(const uint8_t* buf, const size_t buf_len, size_t& written);

  std::string getHost()
  {
    return host_;
  }

private:
  std::string host_;
  int port_;
  std::mutex write_mutex_, read_mutex_;
};

template <typename T>
bool URStream<T>::write(const uint8_t* buf, const size_t buf_len, size_t& written)
{
  std::lock_guard<std::mutex> lock(write_mutex_);
  return TCPSocket::write(buf, buf_len, written);
}

template <typename T>
bool URStream<T>::read(uint8_t* buf, const size_t buf_len, size_t& total)
{
  std::lock_guard<std::mutex> lock(read_mutex_);

  bool initial = true;
  uint8_t* buf_pos = buf;
  size_t remainder = sizeof(typename T::HeaderType::_package_size_type);
  size_t read = 0;

  while (remainder > 0 && TCPSocket::read(buf_pos, remainder, read))
  {
    if (initial)
    {
      remainder = T::HeaderType::getPackageLength(buf);
      if (remainder >= (buf_len - sizeof(typename T::HeaderType::_package_size_type)))
      {
        URCL_LOG_ERROR("Packet size %zd is larger than buffer %zu, discarding.", remainder, buf_len);
        return false;
      }
      initial = false;
    }

    total += read;
    buf_pos += read;
    remainder -= read;
  }

  return remainder == 0;
}
}  // namespace comm
}  // namespace urcl