00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include "opc_tcp_processor.h"
00021
00022 #include <opc/ua/server/opc_tcp_async.h>
00023
00024 #include <opc/ua/protocol/utils.h>
00025 #include <opc/ua/protocol/binary/common.h>
00026 #include <opc/ua/protocol/binary/stream.h>
00027 #include <opc/ua/protocol/channel.h>
00028 #include <opc/ua/protocol/secure_channel.h>
00029 #include <opc/ua/protocol/input_from_buffer.h>
00030
00031 #include <array>
00032 #include <boost/asio.hpp>
00033 #include <iostream>
00034 #include <set>
00035
00036
00037
00038 namespace
00039 {
00040
00041 using namespace OpcUa;
00042 using namespace OpcUa::Binary;
00043 using namespace OpcUa;
00044
00045 using namespace boost::asio;
00046 using namespace boost::asio::ip;
00047
00048
00049 class OpcTcpConnection;
00050
00051 class OpcTcpServer : public OpcUa::Server::AsyncOpcTcp
00052 {
00053 public:
00054 DEFINE_CLASS_POINTERS(OpcTcpServer);
00055
00056 public:
00057 OpcTcpServer(const AsyncOpcTcp::Parameters& params, Services::SharedPtr server, boost::asio::io_service& ioService);
00058
00059 virtual void Listen() override;
00060 virtual void Shutdown() override;
00061
00062 private:
00063 void Accept();
00064
00065 private:
00066 friend class OpcTcpConnection;
00067 void RemoveClient(std::shared_ptr<OpcTcpConnection> client);
00068
00069 private:
00070 Parameters Params;
00071 Services::SharedPtr Server;
00072 std::set<std::shared_ptr<OpcTcpConnection>> Clients;
00073
00074 tcp::socket socket;
00075 tcp::acceptor acceptor;
00076 };
00077
00078
00079 class OpcTcpConnection : public std::enable_shared_from_this<OpcTcpConnection>, private OpcUa::OutputChannel
00080 {
00081 public:
00082 DEFINE_CLASS_POINTERS(OpcTcpConnection);
00083
00084 public:
00085 OpcTcpConnection(tcp::socket socket, OpcTcpServer& tcpServer, Services::SharedPtr uaServer, bool debug);
00086 ~OpcTcpConnection();
00087
00088 void Start();
00089
00090 virtual void Stop()
00091 {
00092 Socket.close();
00093 }
00094
00095
00096 private:
00097 void ReadNextData();
00098 void ProcessHeader(const boost::system::error_code& error, std::size_t bytes_transferred);
00099 void ProcessMessage(OpcUa::Binary::MessageType type, const boost::system::error_code& error, std::size_t bytesTransferred);
00100 void GoodBye();
00101
00102 std::size_t GetHeaderSize() const;
00103
00104 private:
00105 virtual void Send(const char* message, std::size_t size);
00106 void FillResponseHeader(const RequestHeader& requestHeader, ResponseHeader& responseHeader) const;
00107
00108 private:
00109 tcp::socket Socket;
00110 OpcTcpServer& TcpServer;
00111 Server::OpcTcpMessages MessageProcessor;
00112 OStreamBinary OStream;
00113 const bool Debug = false;
00114 std::vector<char> Buffer;
00115 };
00116
00117 OpcTcpConnection::OpcTcpConnection(tcp::socket socket, OpcTcpServer& tcpServer, Services::SharedPtr uaServer, bool debug)
00118 : Socket(std::move(socket))
00119 , TcpServer(tcpServer)
00120 , MessageProcessor(uaServer, *this, debug)
00121 , OStream(*this)
00122 , Debug(debug)
00123 , Buffer(8192)
00124 {
00125 }
00126
00127 OpcTcpConnection::~OpcTcpConnection()
00128 {
00129 }
00130
00131 void OpcTcpConnection::Start()
00132 {
00133 ReadNextData();
00134 }
00135
00136 void OpcTcpConnection::ReadNextData()
00137 {
00138 async_read(Socket, buffer(Buffer), transfer_exactly(GetHeaderSize()),
00139 [this](const boost::system::error_code& error, std::size_t bytes_transferred)
00140 {
00141 try
00142 {
00143 ProcessHeader(error, bytes_transferred);
00144 }
00145 catch (const std::exception& exc)
00146 {
00147 std::cerr << "opc_tcp_async| Failed to process message header: " << exc.what() << std::endl;
00148 }
00149 }
00150 );
00151 }
00152
00153 std::size_t OpcTcpConnection::GetHeaderSize() const
00154 {
00155 return OpcUa::Binary::RawSize(OpcUa::Binary::Header());
00156 }
00157
00158 void OpcTcpConnection::ProcessHeader(const boost::system::error_code& error, std::size_t bytes_transferred)
00159 {
00160 if (error)
00161 {
00162 std::cerr << "opc_tcp_async| Error during receiving message header: " << error.message() << std::endl;
00163 GoodBye();
00164 return;
00165 }
00166
00167 if (Debug) std::cout << "opc_tcp_async| Received message header with size " << bytes_transferred << std::endl;
00168
00169 OpcUa::InputFromBuffer messageChannel(&Buffer[0], bytes_transferred);
00170 IStreamBinary messageStream(messageChannel);
00171 OpcUa::Binary::Header header;
00172 messageStream >> header;
00173
00174 const std::size_t messageSize = header.Size - GetHeaderSize();
00175
00176 if (Debug)
00177 {
00178 std::cout << "opc_tcp_async| Message type: " << header.Type << std::endl;
00179 std::cout << "opc_tcp_async| Chunk type: " << header.Chunk << std::endl;
00180 std::cout << "opc_tcp_async| MessageSize: " << header.Size << std::endl;
00181 std::cout << "opc_tcp_async| Waiting " << messageSize << " bytes from client." << std::endl;
00182 }
00183
00184 async_read(Socket, buffer(Buffer), transfer_exactly(messageSize),
00185 [this, header](const boost::system::error_code& error, std::size_t bytesTransferred)
00186 {
00187 if (error)
00188 {
00189 if (Debug) std::cerr << "opc_tcp_async| Error during receiving message body." << std::endl;
00190 return;
00191 }
00192 ProcessMessage(header.Type, error, bytesTransferred);
00193 }
00194 );
00195
00196 }
00197
00198 void OpcTcpConnection::ProcessMessage(OpcUa::Binary::MessageType type, const boost::system::error_code& error, std::size_t bytesTransferred)
00199 {
00200 if (error)
00201 {
00202 std::cerr << "opc_tcp_async| Error during receiving message body: " << error.message() << std::endl;
00203 GoodBye();
00204 return;
00205 }
00206
00207 if (Debug)
00208 {
00209 if (Debug) std::cout << "opc_tcp_async| Received " << bytesTransferred << " bytes from client:" << std::endl;
00210 PrintBlob(Buffer, bytesTransferred);
00211 }
00212
00213
00214 OpcUa::InputFromBuffer messageChannel(&Buffer[0], bytesTransferred);
00215 IStreamBinary messageStream(messageChannel);
00216
00217 bool cont = true;
00218
00219 try
00220 {
00221 cont = MessageProcessor.ProcessMessage(type, messageStream);
00222 }
00223 catch(const std::exception& exc)
00224 {
00225 std::cerr << "opc_tcp_async| Failed to process message. " << exc.what() << std::endl;
00226 GoodBye();
00227 return;
00228 }
00229
00230 if (messageChannel.GetRemainSize())
00231 {
00232 std::cerr << "opc_tcp_async| ERROR!!! Message from client has been processed partially." << std::endl;
00233 }
00234
00235 if ( ! cont )
00236 {
00237 GoodBye();
00238 return;
00239 }
00240
00241 ReadNextData();
00242 }
00243
00244
00245 void OpcTcpConnection::GoodBye()
00246 {
00247 TcpServer.RemoveClient(shared_from_this());
00248
00249
00250 }
00251
00252 void OpcTcpConnection::Send(const char* message, std::size_t size)
00253 {
00254 std::shared_ptr<std::vector<char>> data = std::make_shared<std::vector<char>>(message, message + size);
00255
00256 if (Debug)
00257 {
00258 std::cout << "opc_tcp_async| Sending next data to the client:" << std::endl;
00259 PrintBlob(*data);
00260 }
00261
00262 async_write(Socket, buffer(&(*data)[0], data->size()), [this, data](const boost::system::error_code & err, size_t bytes){
00263 if (err)
00264 {
00265 std::cerr << "opc_tcp_async| Failed to send data to the client. " << err.message() << std::endl;
00266 GoodBye();
00267 return;
00268 }
00269
00270 if (Debug)
00271 {
00272 std::cout << "opc_tcp_async| Response sent to the client." << std::endl;
00273 }
00274 });
00275 }
00276
00277 OpcTcpServer::OpcTcpServer(const AsyncOpcTcp::Parameters& params, Services::SharedPtr server, boost::asio::io_service& ioService)
00278 : Params(params)
00279 , Server(server)
00280 , socket(ioService)
00281 , acceptor(ioService)
00282 {
00283 tcp::endpoint ep;
00284 if (params.Host.empty() )
00285 {
00286 ep = tcp::endpoint( tcp::v4(), params.Port );
00287 }
00288 else if ( params.Host == "localhost" )
00289 {
00290 ep = tcp::endpoint( ip::address::from_string("127.0.0.1"), params.Port );
00291 }
00292 else
00293 {
00294 ep = tcp::endpoint( ip::address::from_string(params.Host), params.Port );
00295 }
00296 acceptor.open(ep.protocol());
00297 acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
00298 acceptor.bind(ep);
00299 }
00300
00301 void OpcTcpServer::Listen()
00302 {
00303 std::clog << "opc_tcp_async| Running server." << std::endl;
00304 Accept();
00305 }
00306
00307 void OpcTcpServer::Shutdown()
00308 {
00309 std::clog << "opc_tcp_async| Shutting down server." << std::endl;
00310 Clients.clear();
00311 acceptor.close();
00312 }
00313
00314 void OpcTcpServer::Accept()
00315 {
00316 try
00317 {
00318 std::cout << "opc_tcp_async| Waiting for client connection at: " << acceptor.local_endpoint().address() << ":" << acceptor.local_endpoint().port() << std::endl;
00319 acceptor.listen();
00320 acceptor.async_accept(socket, [this](boost::system::error_code errorCode){
00321 if (!errorCode)
00322 {
00323 std::cout << "opc_tcp_async| Accepted new client connection." << std::endl;
00324 std::shared_ptr<OpcTcpConnection> connection = std::make_shared<OpcTcpConnection>(std::move(socket), *this, Server, Params.DebugMode);
00325 Clients.insert(connection);
00326 connection->Start();
00327 }
00328 else
00329 {
00330 std::cout << "opc_tcp_async| Error during client connection: "<< errorCode.message() << std::endl;
00331 }
00332 Accept();
00333 });
00334 }
00335 catch (const std::exception& exc)
00336 {
00337 std::cout << "opc_tcp_async| Error accepting client connection: "<< exc.what() << std::endl;
00338 }
00339 }
00340
00341 void OpcTcpServer::RemoveClient(OpcTcpConnection::SharedPtr client)
00342 {
00343 Clients.erase(client);
00344 }
00345
00346 }
00347
00348 OpcUa::Server::AsyncOpcTcp::UniquePtr OpcUa::Server::CreateAsyncOpcTcp(const OpcUa::Server::AsyncOpcTcp::Parameters& params, Services::SharedPtr server, boost::asio::io_service& io)
00349 {
00350 return AsyncOpcTcp::UniquePtr(new OpcTcpServer(params, server, io));
00351 }