opc_tcp_async.cpp
Go to the documentation of this file.
00001 /******************************************************************************
00002  *   Copyright (C) 2013-2014 by Alexander Rykovanov                        *
00003  *   rykovanov.as@gmail.com                                                   *
00004  *                                                                            *
00005  *   This library is free software; you can redistribute it and/or modify     *
00006  *   it under the terms of the GNU Lesser General Public License as           *
00007  *   published by the Free Software Foundation; version 3 of the License.     *
00008  *                                                                            *
00009  *   This library is distributed in the hope that it will be useful,          *
00010  *   but WITHOUT ANY WARRANTY; without even the implied warranty of           *
00011  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the            *
00012  *   GNU Lesser General Public License for more details.                      *
00013  *                                                                            *
00014  *   You should have received a copy of the GNU Lesser General Public License *
00015  *   along with this library; if not, write to the                            *
00016  *   Free Software Foundation, Inc.,                                          *
00017  *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.                *
00018  ******************************************************************************/
00019 
00020 #include "opc_tcp_processor.h"
00021 
00022 #include <opc/ua/server/opc_tcp_async.h>
00023 
00024 #include <opc/ua/protocol/utils.h>
00025 #include <opc/ua/protocol/binary/common.h>
00026 #include <opc/ua/protocol/binary/stream.h>
00027 #include <opc/ua/protocol/channel.h>
00028 #include <opc/ua/protocol/secure_channel.h>
00029 #include <opc/ua/protocol/input_from_buffer.h>
00030 
00031 #include <array>
00032 #include <boost/asio.hpp>
00033 #include <iostream>
00034 #include <set>
00035 
00036 
00037 
00038 namespace
00039 {
00040 
00041   using namespace OpcUa;
00042   using namespace OpcUa::Binary;
00043   using namespace OpcUa;
00044 
00045   using namespace boost::asio;  
00046   using namespace boost::asio::ip;  
00047 
00048 
00049   class OpcTcpConnection;
00050 
00051   class OpcTcpServer : public OpcUa::Server::AsyncOpcTcp
00052   {
00053   public:
00054     DEFINE_CLASS_POINTERS(OpcTcpServer);
00055 
00056   public:
00057     OpcTcpServer(const AsyncOpcTcp::Parameters& params, Services::SharedPtr server, boost::asio::io_service& ioService);
00058 
00059     virtual void Listen() override;
00060     virtual void Shutdown() override;
00061 
00062   private:
00063     void Accept();
00064 
00065   private:// OpcTcpClient interface;
00066     friend class OpcTcpConnection;
00067     void RemoveClient(std::shared_ptr<OpcTcpConnection> client);
00068 
00069   private:
00070     Parameters Params;
00071     Services::SharedPtr Server;
00072     std::set<std::shared_ptr<OpcTcpConnection>> Clients;
00073 
00074     tcp::socket socket;
00075     tcp::acceptor acceptor;
00076   };
00077 
00078 
00079   class OpcTcpConnection : public std::enable_shared_from_this<OpcTcpConnection>, private OpcUa::OutputChannel
00080   {
00081   public:
00082     DEFINE_CLASS_POINTERS(OpcTcpConnection);
00083 
00084   public:
00085     OpcTcpConnection(tcp::socket socket, OpcTcpServer& tcpServer, Services::SharedPtr uaServer, bool debug);
00086     ~OpcTcpConnection();
00087 
00088     void Start();
00089 
00090     virtual void Stop()
00091     {
00092       Socket.close();
00093     }
00094 
00095 
00096   private:
00097     void ReadNextData();
00098     void ProcessHeader(const boost::system::error_code& error, std::size_t bytes_transferred);
00099     void ProcessMessage(OpcUa::Binary::MessageType type, const boost::system::error_code& error, std::size_t bytesTransferred);
00100     void GoodBye();
00101 
00102     std::size_t GetHeaderSize() const;
00103 
00104   private:
00105     virtual void Send(const char* message, std::size_t size);
00106     void FillResponseHeader(const RequestHeader& requestHeader, ResponseHeader& responseHeader) const;
00107 
00108   private:
00109     tcp::socket Socket;
00110     OpcTcpServer& TcpServer;
00111     Server::OpcTcpMessages MessageProcessor;
00112     OStreamBinary OStream;
00113     const bool Debug = false;
00114     std::vector<char> Buffer;
00115   };
00116 
00117   OpcTcpConnection::OpcTcpConnection(tcp::socket socket, OpcTcpServer& tcpServer, Services::SharedPtr uaServer, bool debug)
00118     : Socket(std::move(socket))
00119     , TcpServer(tcpServer)
00120     , MessageProcessor(uaServer, *this, debug)
00121     , OStream(*this)
00122     , Debug(debug)
00123     , Buffer(8192)
00124   {
00125   }
00126 
00127   OpcTcpConnection::~OpcTcpConnection()
00128   {
00129   }
00130 
00131   void OpcTcpConnection::Start()
00132   {
00133     ReadNextData();
00134   }
00135 
00136   void OpcTcpConnection::ReadNextData()
00137   {
00138     async_read(Socket, buffer(Buffer), transfer_exactly(GetHeaderSize()),
00139       [this](const boost::system::error_code& error, std::size_t bytes_transferred)
00140       {
00141         try
00142         {
00143           ProcessHeader(error, bytes_transferred);
00144         }
00145         catch (const std::exception& exc)
00146         {
00147           std::cerr << "opc_tcp_async| Failed to process message header: " << exc.what() << std::endl;
00148         }
00149       }
00150     );
00151   }
00152 
00153   std::size_t OpcTcpConnection::GetHeaderSize() const
00154   {
00155     return OpcUa::Binary::RawSize(OpcUa::Binary::Header());
00156   }
00157 
00158   void OpcTcpConnection::ProcessHeader(const boost::system::error_code& error, std::size_t bytes_transferred)
00159   {
00160     if (error)
00161     {
00162       std::cerr << "opc_tcp_async| Error during receiving message header: " << error.message() << std::endl;
00163       GoodBye();
00164       return;
00165     }
00166 
00167     if (Debug) std::cout << "opc_tcp_async| Received message header with size " << bytes_transferred << std::endl;
00168 
00169     OpcUa::InputFromBuffer messageChannel(&Buffer[0], bytes_transferred);
00170     IStreamBinary messageStream(messageChannel);
00171     OpcUa::Binary::Header header;
00172     messageStream >> header;
00173 
00174     const std::size_t messageSize = header.Size - GetHeaderSize();
00175 
00176     if (Debug)
00177     {
00178       std::cout << "opc_tcp_async| Message type: " << header.Type << std::endl;
00179       std::cout << "opc_tcp_async| Chunk type: " << header.Chunk << std::endl;
00180       std::cout << "opc_tcp_async| MessageSize: " << header.Size << std::endl;
00181       std::cout << "opc_tcp_async| Waiting " << messageSize << " bytes from client." << std::endl;
00182     }
00183 
00184     async_read(Socket, buffer(Buffer), transfer_exactly(messageSize),
00185         [this, header](const boost::system::error_code& error, std::size_t bytesTransferred)
00186         {
00187           if (error)
00188           {
00189             if (Debug) std::cerr << "opc_tcp_async| Error during receiving message body." << std::endl;
00190             return;
00191           }
00192           ProcessMessage(header.Type, error, bytesTransferred);
00193         }
00194     );
00195 
00196   }
00197 
00198   void OpcTcpConnection::ProcessMessage(OpcUa::Binary::MessageType type, const boost::system::error_code& error, std::size_t bytesTransferred)
00199   {
00200     if (error)
00201     {
00202       std::cerr << "opc_tcp_async| Error during receiving message body: " << error.message() << std::endl;
00203       GoodBye();
00204       return;
00205     }
00206 
00207     if (Debug)
00208     {
00209       if (Debug) std::cout << "opc_tcp_async| Received " << bytesTransferred << " bytes from client:" << std::endl;
00210       PrintBlob(Buffer, bytesTransferred);
00211     }
00212 
00213     // restrict server size code only with current message.
00214     OpcUa::InputFromBuffer messageChannel(&Buffer[0], bytesTransferred);
00215     IStreamBinary messageStream(messageChannel);
00216 
00217     bool cont = true;
00218 
00219     try
00220     {
00221       cont = MessageProcessor.ProcessMessage(type, messageStream);
00222     }
00223     catch(const std::exception& exc)
00224     {
00225       std::cerr << "opc_tcp_async| Failed to process message. " << exc.what() << std::endl;
00226       GoodBye();
00227       return;
00228     }
00229 
00230     if (messageChannel.GetRemainSize())
00231     {
00232       std::cerr << "opc_tcp_async| ERROR!!! Message from client has been processed partially." << std::endl;
00233     }
00234 
00235     if ( ! cont )
00236     {
00237       GoodBye();
00238       return;
00239     }
00240 
00241     ReadNextData();
00242   }
00243 
00244 
00245   void OpcTcpConnection::GoodBye()
00246   {
00247     TcpServer.RemoveClient(shared_from_this());
00248     // valgrind complains that Debug  cannot be read at that point, so do not use it
00249     //if (Debug) std::cout << "opc_tcp_async| Good bye." << std::endl;
00250   }
00251 
00252   void OpcTcpConnection::Send(const char* message, std::size_t size)
00253   {
00254     std::shared_ptr<std::vector<char>> data = std::make_shared<std::vector<char>>(message, message + size);
00255 
00256     if (Debug)
00257     {
00258       std::cout << "opc_tcp_async| Sending next data to the client:" << std::endl;
00259       PrintBlob(*data);
00260     }
00261 
00262     async_write(Socket, buffer(&(*data)[0], data->size()), [this, data](const boost::system::error_code & err, size_t bytes){
00263       if (err)
00264       {
00265         std::cerr << "opc_tcp_async| Failed to send data to the client. " << err.message() << std::endl;
00266         GoodBye();
00267         return;
00268       }
00269 
00270       if (Debug)
00271       {
00272         std::cout << "opc_tcp_async| Response sent to the client." << std::endl;
00273       }
00274     });
00275   }
00276 
00277   OpcTcpServer::OpcTcpServer(const AsyncOpcTcp::Parameters& params, Services::SharedPtr server, boost::asio::io_service& ioService)
00278     : Params(params)
00279     , Server(server)
00280     , socket(ioService)
00281     , acceptor(ioService)
00282   {
00283     tcp::endpoint ep;
00284     if (params.Host.empty() )
00285     {
00286       ep = tcp::endpoint( tcp::v4(), params.Port );
00287     }
00288     else if ( params.Host == "localhost" )
00289     {
00290       ep = tcp::endpoint( ip::address::from_string("127.0.0.1"), params.Port );
00291     }
00292     else
00293     {
00294       ep = tcp::endpoint( ip::address::from_string(params.Host), params.Port );
00295     }
00296     acceptor.open(ep.protocol());
00297     acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
00298     acceptor.bind(ep);
00299   }
00300 
00301   void OpcTcpServer::Listen()
00302   {
00303     std::clog << "opc_tcp_async| Running server." << std::endl;
00304     Accept();
00305   }
00306 
00307   void OpcTcpServer::Shutdown()
00308   {
00309     std::clog << "opc_tcp_async| Shutting down server." << std::endl;
00310     Clients.clear();
00311     acceptor.close();
00312   }
00313 
00314   void OpcTcpServer::Accept()
00315   {
00316     try
00317     {
00318       std::cout << "opc_tcp_async| Waiting for client connection at: " << acceptor.local_endpoint().address() << ":" << acceptor.local_endpoint().port() <<  std::endl;
00319       acceptor.listen();
00320       acceptor.async_accept(socket, [this](boost::system::error_code errorCode){
00321         if (!errorCode)
00322         {
00323           std::cout << "opc_tcp_async| Accepted new client connection." << std::endl;
00324           std::shared_ptr<OpcTcpConnection> connection = std::make_shared<OpcTcpConnection>(std::move(socket), *this, Server, Params.DebugMode);
00325           Clients.insert(connection);
00326           connection->Start();
00327         }
00328         else
00329         {
00330           std::cout << "opc_tcp_async| Error during client connection: "<< errorCode.message() << std::endl;
00331         }
00332         Accept();
00333       });
00334     }
00335     catch (const std::exception& exc)
00336     {
00337       std::cout << "opc_tcp_async| Error accepting client connection: "<< exc.what() << std::endl;
00338     }
00339   }
00340 
00341   void OpcTcpServer::RemoveClient(OpcTcpConnection::SharedPtr client)
00342   {
00343     Clients.erase(client);
00344   }
00345 
00346 } // namespace
00347 
00348 OpcUa::Server::AsyncOpcTcp::UniquePtr OpcUa::Server::CreateAsyncOpcTcp(const OpcUa::Server::AsyncOpcTcp::Parameters& params, Services::SharedPtr server, boost::asio::io_service& io)
00349 {
00350   return AsyncOpcTcp::UniquePtr(new OpcTcpServer(params, server, io));
00351 }


ros_opcua_impl_freeopcua
Author(s): Denis Štogl
autogenerated on Sat Jun 8 2019 18:24:56