tcp_server.cpp
Go to the documentation of this file.
1 
11 #ifdef _WIN32
12 #include <windows.h>
13 #endif
14 
15 #include "tcp_server.h"
16 
17 #include "opcua_protocol.h"
18 #include "opc_tcp_processor.h"
19 #include <opc/ua/protocol/utils.h>
20 
21 #include <opc/common/thread.h>
22 #include <opc/common/uri_facade.h>
24 #include <opc/ua/socket_channel.h>
27 
28 
29 #include <errno.h>
30 #include <iostream>
31 #include <map>
32 #include <mutex>
33 #include <stdexcept>
34 #include <string.h>
35 #include <sys/types.h>
36 
37 
38 #ifdef _WIN32
39 #else
40 #include <arpa/inet.h>
41 #include <netdb.h>
42 #include <netinet/in.h>
43 #include <unistd.h>
44 
45 #include <sys/socket.h>
46 #endif
47 
48 namespace
49 {
50 using namespace OpcUa;
51 using namespace OpcUa::Binary;
52 using namespace OpcUa::Server;
53 
54 
55 class SocketHolder
56 {
57 public:
58  explicit SocketHolder(int socket, const Common::Logger::SharedPtr & logger)
59  : Socket(socket)
60  , Logger(logger)
61  {
62  }
63 
64  ~SocketHolder()
65  {
66  if (close(Socket) < 0)
67  {
68  LOG_ERROR(Logger, "unable to close server socket");
69  }
70  }
71 
72  bool operator < (int sock) const
73  {
74  return Socket < sock;
75  }
76 
77 private:
78  int Socket;
79  Common::Logger::SharedPtr Logger;
80 };
81 
82 
83 class Client : public Common::ThreadObserver
84 {
85 public:
86  Client(std::shared_ptr<IOChannel> channel, std::shared_ptr<IncomingConnectionProcessor> processor, std::function<void()> onFinish, const Common::Logger::SharedPtr & logger)
87  : Channel(channel)
88  , Processor(processor)
89  , OnFinish(onFinish)
90  , Logger(logger)
91  {
92  LOG_INFO(Logger, "starting new client thread");
93  std::function<void()> func = std::bind(&Client::Run, std::ref(*this));
94  ClientThread.reset(new Common::Thread(func));
95  }
96 
97  ~Client()
98  {
99  ClientThread->Join();
100  ClientThread.reset();
101  LOG_INFO(Logger, "client thread stopped");
102  }
103 
104 protected:
105  virtual void OnSuccess()
106  {
107  LOG_INFO(Logger, "server thread exited successfully");
108  }
109 
110  virtual void OnError(const std::exception & exc)
111  {
112  LOG_ERROR(Logger, "server thread terminated: {}", exc.what());
113  }
114 
115 private:
116  void Run()
117  {
118  try
119  {
120  LOG_INFO(Logger, "start to process client connection");
121  Processor->Process(Channel);
122  }
123 
124  catch (const std::exception & exc)
125  {
126  LOG_ERROR(Logger, "unable to process client connection: {}", exc.what());
127  }
128 
129  std::thread t(OnFinish);
130  t.detach();
131  }
132 
133 private:
134  std::shared_ptr<IOChannel> Channel;
135  std::shared_ptr<IncomingConnectionProcessor> Processor;
136  std::function<void()> OnFinish;
137  std::unique_ptr<Common::Thread> ClientThread;
138  Common::Logger::SharedPtr Logger;
139 };
140 
141 
142 class TcpServerConnection : private Common::ThreadObserver
143 {
144 public:
145  TcpServerConnection(const TcpParameters & params, std::shared_ptr<OpcUa::Server::IncomingConnectionProcessor> processor, const Common::Logger::SharedPtr & logger)
146  : Port(params.Port)
147  , Stopped(true)
148  , Socket(-1)
149  , Processor(processor)
150  , Logger(logger)
151  {
152  }
153 
154  virtual ~TcpServerConnection()
155  {
156  try
157  {
158  Stop();
159  }
160 
161  catch (const std::exception & exc)
162  {
163  LOG_ERROR(Logger, "unable to stop server: ", exc.what());
164  }
165  }
166 
167  virtual void Start()
168  {
169  if (!ServerThread)
170  {
171  StartNewThread();
172  }
173  }
174 
175  virtual void Stop()
176  {
177  if (ServerThread)
178  {
179  LOG_INFO(Logger, "shutting down opc ua binary server");
180  Stopped = true;
181  shutdown(Socket, SHUT_RDWR);
182  ServerThread->Join();
183  ServerThread.reset();
184  }
185  }
186 
187 protected:
188  virtual void OnSuccess()
189  {
190  LOG_INFO(Logger, "server thread exited successfully");
191  }
192 
193  virtual void OnError(const std::exception & exc)
194  {
195  LOG_ERROR(Logger, "server thread exited with error: {}", exc.what());
196  //throw 20;
197  }
198 
199 private:
200  void StartNewThread()
201  {
202  LOG_INFO(Logger, "starting new server thread");
203  Stopped = false;
204  std::function<void()> func = std::bind(&TcpServerConnection::Run, std::ref(*this));
205  ServerThread.reset(new Common::Thread(func, this));
206  }
207 
208  void Run()
209  {
210  Socket = socket(AF_INET, SOCK_STREAM, 0);
211 
212  if (Stopped)
213  {
214  return;
215  }
216 
217  if (Socket < 0)
218  {
219  throw std::logic_error(std::string("unable to create server socket: ") + strerror(errno));
220  }
221 
222  SocketHolder holder(Socket, Logger);
223  LOG_INFO(Logger, "listening on: {}", Port);
224 
225  sockaddr_in addr;
226  addr.sin_family = AF_INET;
227  addr.sin_port = htons(Port);
228  addr.sin_addr.s_addr = htonl(INADDR_ANY);
229 
230  if (bind(Socket, (sockaddr *)&addr, sizeof(addr)) < 0)
231  {
232  if (Stopped)
233  {
234  return;
235  }
236 
237  throw std::logic_error(std::string("unable bind socket: ") + strerror(errno));
238  }
239 
240  const unsigned ServerQueueSize = 5;
241  listen(Socket, ServerQueueSize);
242 
243 
244  while (!Stopped)
245  {
246  int clientSocket = accept(Socket, NULL, NULL);
247 
248  if (Stopped)
249  {
250  return;
251  }
252 
253  if (clientSocket < 0)
254  {
255  throw std::logic_error(std::string("unable to accept client connection: ") + strerror(errno));
256  }
257 
258  std::unique_lock<std::mutex> lock(ClientsMutex);
259  std::shared_ptr<IOChannel> clientChannel(new SocketChannel(clientSocket));
260  std::shared_ptr<Client> clientThread(new Client(clientChannel, Processor, std::bind(&TcpServerConnection::Erase, std::ref(*this), clientSocket), Logger));
261  ClientThreads.insert(std::make_pair(clientSocket, clientThread));
262  }
263 
264  ClientThreads.clear();
265  }
266 
267  void Erase(int client)
268  {
269  std::unique_lock<std::mutex> lock(ClientsMutex);
270 
271  if (!Stopped)
272  {
273  ClientThreads.erase(client);
274  }
275  }
276 
277 private:
278  const unsigned short Port;
279  volatile bool Stopped;
280  volatile int Socket;
281  std::shared_ptr<IncomingConnectionProcessor> Processor;
282  std::unique_ptr<Common::Thread> ServerThread;
283  std::mutex ClientsMutex;
284  std::map<int, std::shared_ptr<Client>> ClientThreads;
285  Common::Logger::SharedPtr Logger;
286 };
287 
288 class TcpServer : public OpcUa::Server::TcpServer
289 {
290 public:
292 
293  TcpServer(const Common::Logger::SharedPtr & logger)
294  : OpcUa::Server::TcpServer(logger)
295  {}
296 
297  void Listen(const OpcUa::Server::TcpParameters & params, std::shared_ptr<OpcUa::Server::IncomingConnectionProcessor> processor) override
298  {
299  if (Servers.find(params.Port) != std::end(Servers))
300  {
301  // TODO add portnumber into message.
302  throw std::logic_error("server on this port already exists");
303  }
304 
305  std::shared_ptr<TcpServerConnection> listener(new TcpServerConnection(params, processor, Logger));
306  listener->Start();
307  Servers.insert(std::make_pair(params.Port, listener));
308  }
309 
310  void StopListen(const OpcUa::Server::TcpParameters & params) override
311  {
312  ServersMap::iterator serverIt = Servers.find(params.Port);
313 
314  if (serverIt == std::end(Servers))
315  {
316  return;
317  }
318 
319  try
320  {
321  serverIt->second->Stop();
322  Servers.erase(serverIt);
323  }
324 
325  catch (const std::exception & exc)
326  {
327  LOG_ERROR(Logger, "stopping TcpServer on port: {} failed: {}", params.Port, exc.what());
328  }
329  }
330 
331 private:
332  typedef std::map<unsigned short, std::shared_ptr<TcpServerConnection>> ServersMap;
333  ServersMap Servers;
334 };
335 
336 } // namespace
337 
338 std::unique_ptr<OpcUa::Server::TcpServer> OpcUa::Server::CreateTcpServer(const Common::Logger::SharedPtr & logger)
339 {
340  return std::unique_ptr<OpcUa::Server::TcpServer>(new ::TcpServer(logger));
341 }
Addon interface definition GNU LGPL.
#define LOG_ERROR(__logger__,...)
Definition: common/logger.h:27
Definition: client.py:1
#define LOG_INFO(__logger__,...)
Definition: common/logger.h:25
OPC UA Address space part. GNU LGPL.
#define DEFINE_CLASS_POINTERS(ClassName)
Exception declarations GNU LGPL.
ROSCONSOLE_DECL void shutdown()
TcpServer::UniquePtr CreateTcpServer(const Common::Logger::SharedPtr &logger)
Definition: tcp_server.cpp:338


ros_opcua_impl_freeopcua
Author(s): Denis Štogl
autogenerated on Tue Jan 19 2021 03:12:08