tcp_server.cpp
Go to the documentation of this file.
00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 #ifdef _WIN32
00012 #include <windows.h>
00013 #endif
00014 
00015 #include "tcp_server.h"
00016 
00017 #include "opcua_protocol.h"
00018 #include "opc_tcp_processor.h"
00019 #include <opc/ua/protocol/utils.h>
00020 
00021 #include <opc/common/thread.h>
00022 #include <opc/common/uri_facade.h>
00023 #include <opc/ua/services/services.h>
00024 #include <opc/ua/socket_channel.h>
00025 #include <opc/ua/protocol/binary/stream.h>
00026 #include <opc/ua/protocol/input_from_buffer.h>
00027 
00028 
00029 #include <errno.h>
00030 #include <iostream>
00031 #include <map>
00032 #include <mutex>
00033 #include <stdexcept>
00034 #include <string.h>
00035 #include <sys/types.h>
00036 
00037 
00038 #ifdef _WIN32
00039 #else
00040 #include <arpa/inet.h>
00041 #include <netdb.h>
00042 #include <netinet/in.h>
00043 #include <unistd.h>
00044 
00045 #include <sys/socket.h>
00046 #endif
00047 
00048 namespace
00049 {
00050   using namespace OpcUa;
00051   using namespace OpcUa::Binary;
00052   using namespace OpcUa::Server;
00053 
00054  
00055   class SocketHolder
00056   {
00057   public:
00058     explicit SocketHolder(int socket)
00059       : Socket(socket)
00060     {
00061     }
00062 
00063     ~SocketHolder()
00064     {
00065       if (close(Socket) < 0)
00066       {
00067         std::cerr << "Unable to close server socket." << strerror(errno) << std::endl;
00068       }
00069     }
00070 
00071     bool operator < (int sock) const
00072     {
00073       return Socket < sock;
00074     }
00075 
00076   private:
00077     int Socket;
00078   };
00079 
00080 
00081   class Client : public Common::ThreadObserver
00082   {
00083   public:
00084     Client(std::shared_ptr<IOChannel> channel, std::shared_ptr<IncomingConnectionProcessor> processor, std::function<void()> onFinish)
00085       : Channel(channel)
00086       , Processor(processor)
00087       , OnFinish(onFinish)
00088     {
00089       std::clog << "Starting new client thread." << std::endl;
00090       std::function<void()> func = std::bind(&Client::Run, std::ref(*this));
00091       ClientThread.reset(new Common::Thread(func));
00092     }
00093 
00094     ~Client()
00095     {
00096       ClientThread->Join();
00097       ClientThread.reset();
00098       std::clog << "Client thread stopped." << std::endl;
00099     }
00100 
00101   protected:
00102     virtual void OnSuccess()
00103     {
00104       std::cerr << "Server thread was exited successfully." << std::endl;
00105     }
00106 
00107     virtual void OnError(const std::exception& exc)
00108     {
00109       std::cerr << "Server thread has exited with error:" << exc.what() << std::endl;
00110     }   
00111 
00112   private:
00113     void Run()
00114     {
00115       try
00116       {
00117         std::cout << "start process" << std::endl;
00118         Processor->Process(Channel); 
00119       }
00120       catch (const std::exception& exc)
00121       {
00122         std::cerr << "unable to process client connection. " << exc.what() << std::endl;
00123       }
00124       std::thread t(OnFinish);
00125       t.detach();
00126     }
00127 
00128   private:
00129     std::shared_ptr<IOChannel> Channel;
00130     std::shared_ptr<IncomingConnectionProcessor> Processor;
00131     std::function<void()> OnFinish;
00132     std::unique_ptr<Common::Thread> ClientThread;
00133   };
00134 
00135 
00136   class TcpServerConnection : private Common::ThreadObserver
00137   {
00138   public:
00139     TcpServerConnection(const TcpParameters& params, std::shared_ptr<OpcUa::Server::IncomingConnectionProcessor> processor)
00140       : Port(params.Port)
00141       , Stopped(true)
00142       , Socket(-1)
00143       , Processor(processor)
00144     {
00145     }
00146 
00147     virtual ~TcpServerConnection()
00148     {
00149       try
00150       {
00151         Stop();
00152       }
00153       catch (const std::exception& exc)
00154       {
00155         std::cerr << "Error unable to stop server. " << exc.what() << std::endl;
00156       }
00157     }
00158 
00159     virtual void Start()
00160     {
00161       if (!ServerThread)
00162       {
00163         StartNewThread();
00164       }
00165     }
00166 
00167     virtual void Stop()
00168     {
00169       if (ServerThread)
00170       {
00171         std::clog << "Shutting down opc ua binary server" << std::endl;
00172         Stopped = true;
00173         shutdown(Socket, SHUT_RDWR);
00174         ServerThread->Join();
00175         ServerThread.reset();
00176       }
00177     }
00178 
00179   protected:
00180     virtual void OnSuccess()
00181     {
00182       std::cerr << "Server thread was exited successfully." << std::endl;
00183     }
00184 
00185     virtual void OnError(const std::exception& exc)
00186     {
00187       std::cerr << "Server thread has exited with error:" << exc.what() << std::endl;
00188       //throw 20;
00189     }   
00190 
00191   private:
00192     void StartNewThread()
00193     {
00194       std::clog << "Starting new server thread." << std::endl;
00195       Stopped = false;
00196       std::function<void()> func = std::bind(&TcpServerConnection::Run, std::ref(*this));
00197       ServerThread.reset(new Common::Thread(func, this));
00198     }
00199 
00200     void Run()
00201     {
00202       Socket = socket(AF_INET, SOCK_STREAM, 0);
00203       if (Stopped)
00204       {
00205         return;
00206       }
00207       if (Socket  < 0)
00208       {
00209         throw std::logic_error(std::string("Unable to create server socket. ") + strerror(errno));
00210       }
00211 
00212       SocketHolder holder(Socket);
00213       std::cout << "Listening on: " << Port << std::endl;
00214 
00215       sockaddr_in addr;
00216       addr.sin_family = AF_INET;
00217       addr.sin_port = htons(Port);
00218       addr.sin_addr.s_addr = htonl(INADDR_ANY);
00219       if (bind(Socket, (sockaddr*)&addr, sizeof(addr)) < 0)
00220       {
00221         if (Stopped)
00222         {
00223           return;
00224         }
00225         throw std::logic_error(std::string("Unable bind socket. ") + strerror(errno));
00226       }
00227 
00228       const unsigned ServerQueueSize = 5;
00229       listen (Socket, ServerQueueSize);
00230  
00231      
00232       while (!Stopped)
00233       {
00234         int clientSocket = accept(Socket, NULL, NULL);
00235         if (Stopped)
00236         {
00237           return;
00238         }
00239 
00240         if (clientSocket < 0)
00241         {
00242           throw std::logic_error(std::string("Unable to accept client connection. ") + strerror(errno));
00243         }
00244 
00245         std::unique_lock<std::mutex> lock(ClientsMutex);
00246         std::shared_ptr<IOChannel> clientChannel(new SocketChannel(clientSocket));
00247         std::shared_ptr<Client> clientThread(new Client(clientChannel, Processor, std::bind(&TcpServerConnection::Erase, std::ref(*this), clientSocket)));
00248         ClientThreads.insert(std::make_pair(clientSocket, clientThread));
00249       }
00250 
00251       ClientThreads.clear();
00252     }
00253 
00254     void Erase(int client)
00255     {
00256       std::unique_lock<std::mutex> lock(ClientsMutex);
00257       if (!Stopped)
00258       {
00259         ClientThreads.erase(client);
00260       }
00261     }
00262 
00263   private:
00264     const unsigned short Port;
00265     std::shared_ptr<IncomingConnectionProcessor> Processor;
00266     volatile bool Stopped;
00267     volatile int Socket;
00268     std::unique_ptr<Common::Thread> ServerThread;
00269     std::mutex ClientsMutex;
00270     std::map<int, std::shared_ptr<Client>> ClientThreads;
00271   };
00272 
00273   class TcpServer : public OpcUa::Server::TcpServer
00274   {
00275   public:
00276     DEFINE_CLASS_POINTERS(TcpServer);
00277 
00278     void Listen(const OpcUa::Server::TcpParameters& params, std::shared_ptr<OpcUa::Server::IncomingConnectionProcessor> processor) override
00279     {
00280       if (Servers.find(params.Port) != std::end(Servers))
00281       {
00282         // TODO add portnumber into message.
00283         throw std::logic_error("Server already started at this port.");
00284       }
00285 
00286       std::shared_ptr<TcpServerConnection> listener(new TcpServerConnection(params, processor));
00287       listener->Start();
00288       Servers.insert(std::make_pair(params.Port, listener));
00289     }
00290 
00291     void StopListen(const OpcUa::Server::TcpParameters& params) override
00292     {
00293       ServersMap::iterator serverIt = Servers.find(params.Port);
00294       if (serverIt == std::end(Servers))
00295       {
00296         return;
00297       }
00298 
00299       try
00300       {
00301         serverIt->second->Stop();
00302         Servers.erase(serverIt);
00303       }
00304       catch (const std::exception& exc)
00305       {
00306         // TODO add port number to the message
00307         std::clog << "Stopping TcpServerAddon failed with error: " << exc.what() << std::endl;
00308       }
00309     }
00310 
00311   private:
00312     typedef std::map<unsigned short, std::shared_ptr<TcpServerConnection>> ServersMap;
00313     ServersMap Servers;
00314   };
00315 
00316 } // namespace
00317 
00318 std::unique_ptr<OpcUa::Server::TcpServer> OpcUa::Server::CreateTcpServer()
00319 {
00320   return std::unique_ptr<OpcUa::Server::TcpServer>(new ::TcpServer());
00321 }


ros_opcua_impl_freeopcua
Author(s): Denis Štogl
autogenerated on Sat Jun 8 2019 18:24:57