Program Listing for File stream.h
↰ Return to documentation for file (/tmp/ws/src/ur_client_library/include/ur_client_library/comm/stream.h
)
/*
* Copyright 2019, FZI Forschungszentrum Informatik (templating)
*
* Copyright 2017, 2018 Simon Rasmussen (refactor)
*
* Copyright 2015, 2016 Thomas Timm Andersen (original version)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <netdb.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <atomic>
#include <chrono>
#include <mutex>
#include <string>
#include "ur_client_library/log.h"
#include "ur_client_library/comm/tcp_socket.h"
namespace urcl
{
namespace comm
{
template <typename T>
class URStream : public TCPSocket
{
public:
URStream(const std::string& host, int port) : host_(host), port_(port)
{
}
bool connect(const size_t max_num_tries = 0,
const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10))
{
return TCPSocket::setup(host_, port_, max_num_tries, reconnection_time);
}
void disconnect()
{
URCL_LOG_DEBUG("Disconnecting from %s:%d", host_.c_str(), port_);
TCPSocket::close();
}
bool closed()
{
return getState() == SocketState::Closed;
}
bool read(uint8_t* buf, const size_t buf_len, size_t& read);
bool write(const uint8_t* buf, const size_t buf_len, size_t& written);
std::string getHost()
{
return host_;
}
private:
std::string host_;
int port_;
std::mutex write_mutex_, read_mutex_;
};
template <typename T>
bool URStream<T>::write(const uint8_t* buf, const size_t buf_len, size_t& written)
{
std::lock_guard<std::mutex> lock(write_mutex_);
return TCPSocket::write(buf, buf_len, written);
}
template <typename T>
bool URStream<T>::read(uint8_t* buf, const size_t buf_len, size_t& total)
{
std::lock_guard<std::mutex> lock(read_mutex_);
bool initial = true;
uint8_t* buf_pos = buf;
size_t remainder = sizeof(typename T::HeaderType::_package_size_type);
size_t read = 0;
while (remainder > 0 && TCPSocket::read(buf_pos, remainder, read))
{
if (initial)
{
remainder = T::HeaderType::getPackageLength(buf);
if (remainder >= (buf_len - sizeof(typename T::HeaderType::_package_size_type)))
{
URCL_LOG_ERROR("Packet size %zd is larger than buffer %zu, discarding.", remainder, buf_len);
return false;
}
initial = false;
}
total += read;
buf_pos += read;
remainder -= read;
}
return remainder == 0;
}
} // namespace comm
} // namespace urcl