Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #ifdef _WIN32
00012 #include <windows.h>
00013 #endif
00014
00015 #include "tcp_server.h"
00016
00017 #include "opcua_protocol.h"
00018 #include "opc_tcp_processor.h"
00019 #include <opc/ua/protocol/utils.h>
00020
00021 #include <opc/common/thread.h>
00022 #include <opc/common/uri_facade.h>
00023 #include <opc/ua/services/services.h>
00024 #include <opc/ua/socket_channel.h>
00025 #include <opc/ua/protocol/binary/stream.h>
00026 #include <opc/ua/protocol/input_from_buffer.h>
00027
00028
00029 #include <errno.h>
00030 #include <iostream>
00031 #include <map>
00032 #include <mutex>
00033 #include <stdexcept>
00034 #include <string.h>
00035 #include <sys/types.h>
00036
00037
00038 #ifdef _WIN32
00039 #else
00040 #include <arpa/inet.h>
00041 #include <netdb.h>
00042 #include <netinet/in.h>
00043 #include <unistd.h>
00044
00045 #include <sys/socket.h>
00046 #endif
00047
00048 namespace
00049 {
00050 using namespace OpcUa;
00051 using namespace OpcUa::Binary;
00052 using namespace OpcUa::Server;
00053
00054
00055 class SocketHolder
00056 {
00057 public:
00058 explicit SocketHolder(int socket)
00059 : Socket(socket)
00060 {
00061 }
00062
00063 ~SocketHolder()
00064 {
00065 if (close(Socket) < 0)
00066 {
00067 std::cerr << "Unable to close server socket." << strerror(errno) << std::endl;
00068 }
00069 }
00070
00071 bool operator < (int sock) const
00072 {
00073 return Socket < sock;
00074 }
00075
00076 private:
00077 int Socket;
00078 };
00079
00080
00081 class Client : public Common::ThreadObserver
00082 {
00083 public:
00084 Client(std::shared_ptr<IOChannel> channel, std::shared_ptr<IncomingConnectionProcessor> processor, std::function<void()> onFinish)
00085 : Channel(channel)
00086 , Processor(processor)
00087 , OnFinish(onFinish)
00088 {
00089 std::clog << "Starting new client thread." << std::endl;
00090 std::function<void()> func = std::bind(&Client::Run, std::ref(*this));
00091 ClientThread.reset(new Common::Thread(func));
00092 }
00093
00094 ~Client()
00095 {
00096 ClientThread->Join();
00097 ClientThread.reset();
00098 std::clog << "Client thread stopped." << std::endl;
00099 }
00100
00101 protected:
00102 virtual void OnSuccess()
00103 {
00104 std::cerr << "Server thread was exited successfully." << std::endl;
00105 }
00106
00107 virtual void OnError(const std::exception& exc)
00108 {
00109 std::cerr << "Server thread has exited with error:" << exc.what() << std::endl;
00110 }
00111
00112 private:
00113 void Run()
00114 {
00115 try
00116 {
00117 std::cout << "start process" << std::endl;
00118 Processor->Process(Channel);
00119 }
00120 catch (const std::exception& exc)
00121 {
00122 std::cerr << "unable to process client connection. " << exc.what() << std::endl;
00123 }
00124 std::thread t(OnFinish);
00125 t.detach();
00126 }
00127
00128 private:
00129 std::shared_ptr<IOChannel> Channel;
00130 std::shared_ptr<IncomingConnectionProcessor> Processor;
00131 std::function<void()> OnFinish;
00132 std::unique_ptr<Common::Thread> ClientThread;
00133 };
00134
00135
00136 class TcpServerConnection : private Common::ThreadObserver
00137 {
00138 public:
00139 TcpServerConnection(const TcpParameters& params, std::shared_ptr<OpcUa::Server::IncomingConnectionProcessor> processor)
00140 : Port(params.Port)
00141 , Stopped(true)
00142 , Socket(-1)
00143 , Processor(processor)
00144 {
00145 }
00146
00147 virtual ~TcpServerConnection()
00148 {
00149 try
00150 {
00151 Stop();
00152 }
00153 catch (const std::exception& exc)
00154 {
00155 std::cerr << "Error unable to stop server. " << exc.what() << std::endl;
00156 }
00157 }
00158
00159 virtual void Start()
00160 {
00161 if (!ServerThread)
00162 {
00163 StartNewThread();
00164 }
00165 }
00166
00167 virtual void Stop()
00168 {
00169 if (ServerThread)
00170 {
00171 std::clog << "Shutting down opc ua binary server" << std::endl;
00172 Stopped = true;
00173 shutdown(Socket, SHUT_RDWR);
00174 ServerThread->Join();
00175 ServerThread.reset();
00176 }
00177 }
00178
00179 protected:
00180 virtual void OnSuccess()
00181 {
00182 std::cerr << "Server thread was exited successfully." << std::endl;
00183 }
00184
00185 virtual void OnError(const std::exception& exc)
00186 {
00187 std::cerr << "Server thread has exited with error:" << exc.what() << std::endl;
00188
00189 }
00190
00191 private:
00192 void StartNewThread()
00193 {
00194 std::clog << "Starting new server thread." << std::endl;
00195 Stopped = false;
00196 std::function<void()> func = std::bind(&TcpServerConnection::Run, std::ref(*this));
00197 ServerThread.reset(new Common::Thread(func, this));
00198 }
00199
00200 void Run()
00201 {
00202 Socket = socket(AF_INET, SOCK_STREAM, 0);
00203 if (Stopped)
00204 {
00205 return;
00206 }
00207 if (Socket < 0)
00208 {
00209 throw std::logic_error(std::string("Unable to create server socket. ") + strerror(errno));
00210 }
00211
00212 SocketHolder holder(Socket);
00213 std::cout << "Listening on: " << Port << std::endl;
00214
00215 sockaddr_in addr;
00216 addr.sin_family = AF_INET;
00217 addr.sin_port = htons(Port);
00218 addr.sin_addr.s_addr = htonl(INADDR_ANY);
00219 if (bind(Socket, (sockaddr*)&addr, sizeof(addr)) < 0)
00220 {
00221 if (Stopped)
00222 {
00223 return;
00224 }
00225 throw std::logic_error(std::string("Unable bind socket. ") + strerror(errno));
00226 }
00227
00228 const unsigned ServerQueueSize = 5;
00229 listen (Socket, ServerQueueSize);
00230
00231
00232 while (!Stopped)
00233 {
00234 int clientSocket = accept(Socket, NULL, NULL);
00235 if (Stopped)
00236 {
00237 return;
00238 }
00239
00240 if (clientSocket < 0)
00241 {
00242 throw std::logic_error(std::string("Unable to accept client connection. ") + strerror(errno));
00243 }
00244
00245 std::unique_lock<std::mutex> lock(ClientsMutex);
00246 std::shared_ptr<IOChannel> clientChannel(new SocketChannel(clientSocket));
00247 std::shared_ptr<Client> clientThread(new Client(clientChannel, Processor, std::bind(&TcpServerConnection::Erase, std::ref(*this), clientSocket)));
00248 ClientThreads.insert(std::make_pair(clientSocket, clientThread));
00249 }
00250
00251 ClientThreads.clear();
00252 }
00253
00254 void Erase(int client)
00255 {
00256 std::unique_lock<std::mutex> lock(ClientsMutex);
00257 if (!Stopped)
00258 {
00259 ClientThreads.erase(client);
00260 }
00261 }
00262
00263 private:
00264 const unsigned short Port;
00265 std::shared_ptr<IncomingConnectionProcessor> Processor;
00266 volatile bool Stopped;
00267 volatile int Socket;
00268 std::unique_ptr<Common::Thread> ServerThread;
00269 std::mutex ClientsMutex;
00270 std::map<int, std::shared_ptr<Client>> ClientThreads;
00271 };
00272
00273 class TcpServer : public OpcUa::Server::TcpServer
00274 {
00275 public:
00276 DEFINE_CLASS_POINTERS(TcpServer);
00277
00278 void Listen(const OpcUa::Server::TcpParameters& params, std::shared_ptr<OpcUa::Server::IncomingConnectionProcessor> processor) override
00279 {
00280 if (Servers.find(params.Port) != std::end(Servers))
00281 {
00282
00283 throw std::logic_error("Server already started at this port.");
00284 }
00285
00286 std::shared_ptr<TcpServerConnection> listener(new TcpServerConnection(params, processor));
00287 listener->Start();
00288 Servers.insert(std::make_pair(params.Port, listener));
00289 }
00290
00291 void StopListen(const OpcUa::Server::TcpParameters& params) override
00292 {
00293 ServersMap::iterator serverIt = Servers.find(params.Port);
00294 if (serverIt == std::end(Servers))
00295 {
00296 return;
00297 }
00298
00299 try
00300 {
00301 serverIt->second->Stop();
00302 Servers.erase(serverIt);
00303 }
00304 catch (const std::exception& exc)
00305 {
00306
00307 std::clog << "Stopping TcpServerAddon failed with error: " << exc.what() << std::endl;
00308 }
00309 }
00310
00311 private:
00312 typedef std::map<unsigned short, std::shared_ptr<TcpServerConnection>> ServersMap;
00313 ServersMap Servers;
00314 };
00315
00316 }
00317
00318 std::unique_ptr<OpcUa::Server::TcpServer> OpcUa::Server::CreateTcpServer()
00319 {
00320 return std::unique_ptr<OpcUa::Server::TcpServer>(new ::TcpServer());
00321 }