32 #include <boost/asio.hpp> 42 using namespace OpcUa;
44 using namespace OpcUa;
50 class OpcTcpConnection;
58 OpcTcpServer(const AsyncOpcTcp::Parameters & params,
Services::SharedPtr
server,
boost::asio::io_service & ioService, const
Common::Logger::SharedPtr & logger);
60 virtual
void Listen() override;
61 virtual
void Shutdown() override;
67 friend class OpcTcpConnection;
68 void RemoveClient(
std::shared_ptr<OpcTcpConnection>
client);
73 Common::Logger::SharedPtr Logger;
75 std::
set<
std::shared_ptr<OpcTcpConnection>> Clients;
78 tcp::acceptor acceptor;
82 class OpcTcpConnection : public
std::enable_shared_from_this<OpcTcpConnection>, public
OpcUa::
OutputChannel 93 OpcTcpConnection(tcp::socket socket, OpcTcpServer & tcpServer, const
Common::Logger::SharedPtr & logger);
94 static SharedPtr
create(tcp::socket socket, OpcTcpServer & tcpServer,
Services::SharedPtr uaServer, const
Common::Logger::SharedPtr & logger);
107 typedef std::promise<void> Promise;
109 #if BOOST_VERSION < 107000 110 Socket.get_io_service().post(bind(&Promise::set_value, &promise));
112 post(Socket.get_executor(), bind(&Promise::set_value, &promise));
114 promise.get_future().wait();
120 void ProcessHeader(
const boost::system::error_code & error, std::size_t bytes_transferred);
124 std::size_t GetHeaderSize()
const;
127 virtual void Send(
const char *
message, std::size_t size);
132 OpcTcpServer & TcpServer;
133 Server::OpcTcpMessages::SharedPtr MessageProcessor;
135 Common::Logger::SharedPtr Logger;
136 std::vector<char> Buffer;
139 OpcTcpConnection::OpcTcpConnection(tcp::socket socket, OpcTcpServer & tcpServer,
const Common::Logger::SharedPtr & logger)
141 , TcpServer(tcpServer)
148 OpcTcpConnection::SharedPtr
OpcTcpConnection::create(tcp::socket socket, OpcTcpServer & tcpServer, Services::SharedPtr uaServer,
const Common::Logger::SharedPtr & logger)
150 SharedPtr result = std::make_shared<OpcTcpConnection>(
std::move(socket), tcpServer, logger);
155 result->MessageProcessor = std::make_shared<Server::OpcTcpMessages>(uaServer, result, logger);
159 OpcTcpConnection::~OpcTcpConnection()
163 void OpcTcpConnection::Start()
168 void OpcTcpConnection::ReadNextData()
172 OpcTcpConnection::SharedPtr
self = shared_from_this();
173 async_read(Socket, buffer(Buffer), transfer_exactly(GetHeaderSize()),
174 [
self](
const boost::system::error_code & error, std::size_t bytes_transferred)
178 self->ProcessHeader(error, bytes_transferred);
181 catch (
const std::exception & exc)
183 LOG_WARN(self->Logger,
"opc_tcp_async | failed to process message header: {}", exc.what());
189 std::size_t OpcTcpConnection::GetHeaderSize()
const 194 void OpcTcpConnection::ProcessHeader(
const boost::system::error_code & error, std::size_t bytes_transferred)
198 LOG_ERROR(Logger,
"opc_tcp_async | error receiving message header: {}", error.message());
203 LOG_DEBUG(Logger,
"opc_tcp_async | received message header with size: {}", bytes_transferred);
208 messageStream >> header;
210 const std::size_t messageSize = header.
Size - GetHeaderSize();
212 LOG_DEBUG(Logger,
"opc_tcp_async | received message: Type: {}, ChunkType: {}, Size: {}: DataSize: {}", header.Type, header.Chunk, header.Size, messageSize);
216 OpcTcpConnection::SharedPtr
self = shared_from_this();
217 async_read(Socket, buffer(Buffer), transfer_exactly(messageSize),
218 [
self, header](
const boost::system::error_code & error, std::size_t bytesTransferred)
220 self->ProcessMessage(header.Type, error, bytesTransferred);
226 void OpcTcpConnection::ProcessMessage(
OpcUa::Binary::MessageType type,
const boost::system::error_code & error, std::size_t bytesTransferred)
230 LOG_ERROR(Logger,
"opc_tcp_async | error receiving message body: {}", error.message());
235 LOG_TRACE(Logger,
"opc_tcp_async | received message: {}",
ToHexDump(Buffer, bytesTransferred));
245 cont = MessageProcessor->ProcessMessage(type, messageStream);
248 catch (
const std::exception & exc)
250 LOG_ERROR(Logger,
"opc_tcp_async | failed to process message: {}", exc.what());
255 if (messageChannel.GetRemainSize())
257 std::cerr <<
"opc_tcp_async | ERROR!!! Message from client has been processed partially." << std::endl;
270 void OpcTcpConnection::GoodBye()
273 OpcTcpConnection::SharedPtr
self = shared_from_this();
274 TcpServer.RemoveClient(
self);
275 LOG_DEBUG(Logger,
"opc_tcp_async | good bye");
278 void OpcTcpConnection::Send(
const char *
message, std::size_t size)
280 std::shared_ptr<std::vector<char>> data = std::make_shared<std::vector<char>>(
message, message + size);
286 OpcTcpConnection::SharedPtr
self = shared_from_this();
287 async_write(Socket, buffer(&(*data)[0], data->size()), [
self, data](
const boost::system::error_code &
err,
size_t bytes)
291 LOG_ERROR(self->Logger,
"opc_tcp_async | failed to send data: {}", err.message());
296 LOG_DEBUG(self->Logger,
"opc_tcp_async | response sent");
300 OpcTcpServer::OpcTcpServer(
const AsyncOpcTcp::Parameters & params, Services::SharedPtr server, boost::asio::io_service & ioService,
const Common::Logger::SharedPtr & logger)
305 , acceptor(ioService)
309 if (params.Host.empty())
311 ep = tcp::endpoint(tcp::v4(), params.Port);
314 else if (params.Host ==
"localhost")
316 ep = tcp::endpoint(ip::address::from_string(
"127.0.0.1"), params.Port);
321 ep = tcp::endpoint(ip::address::from_string(params.Host), params.Port);
324 acceptor.open(ep.protocol());
325 acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(
true));
329 void OpcTcpServer::Listen()
331 LOG_DEBUG(Logger,
"opc_tcp_async | running server");
333 LOG_DEBUG(Logger,
"opc_tcp_async | waiting for client connection at: {}:{}", acceptor.local_endpoint().address(), acceptor.local_endpoint().port());
341 LOG_DEBUG(Logger,
"opc_tcp_async | shutting down server");
351 typedef std::set<OpcTcpConnection::SharedPtr> OpcTcpConnectionSet;
352 OpcTcpConnectionSet tmp;
354 std::unique_lock<std::mutex> lock(Mutex);
355 tmp = OpcTcpConnectionSet(Clients);
362 for (
auto client : tmp)
369 std::unique_lock<std::mutex> lock(Mutex);
377 typedef std::promise<void> Promise;
379 #if BOOST_VERSION < 107000 380 acceptor.get_io_service().post(bind(&Promise::set_value, &promise));
382 post(acceptor.get_executor(), bind(&Promise::set_value, &promise));
384 promise.get_future().wait();
387 void OpcTcpServer::Accept()
391 acceptor.async_accept(socket, [
this](boost::system::error_code errorCode)
393 if (!acceptor.is_open())
400 LOG_DEBUG(Logger,
"opc_tcp_async | accepted new client connection");
403 std::unique_lock<std::mutex> lock(Mutex);
404 Clients.insert(connection);
411 LOG_WARN(Logger,
"opc_tcp_async | error during client connection: {}", errorCode.message());
418 catch (
const std::exception & exc)
420 LOG_WARN(Logger,
"opc_tcp_async | error accepting client connection: {}", exc.what());
424 void OpcTcpServer::RemoveClient(OpcTcpConnection::SharedPtr client)
426 std::unique_lock<std::mutex> lock(Mutex);
427 Clients.erase(client);
434 return AsyncOpcTcp::UniquePtr(
new OpcTcpServer(params, server, io, logger));
#define LOG_TRACE(__logger__,...)
#define LOG_WARN(__logger__,...)
Addon interface definition GNU LGPL.
Opc Ua computer interface. GNU LGPL.
#define LOG_ERROR(__logger__,...)
#define LOG_DEBUG(__logger__,...)
std_msgs::Header * header(M &m)
fmt::BufferedFile & move(fmt::BufferedFile &f)
std::shared_ptr< logger > create(const std::string &logger_name, const sink_ptr &sink)
OPC UA Address space part. GNU LGPL.
std::string ToHexDump(const char *buf, std::size_t size)
#define DEFINE_CLASS_POINTERS(ClassName)
Exception declarations GNU LGPL.
AsyncOpcTcp::UniquePtr CreateAsyncOpcTcp(const AsyncOpcTcp::Parameters ¶ms, Services::SharedPtr server, boost::asio::io_service &io, const Common::Logger::SharedPtr &logger)
std::size_t RawSize(const T &obj)