opc_tcp_async.cpp
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright (C) 2013-2014 by Alexander Rykovanov *
3  * rykovanov.as@gmail.com *
4  * *
5  * This library is free software; you can redistribute it and/or modify *
6  * it under the terms of the GNU Lesser General Public License as *
7  * published by the Free Software Foundation; version 3 of the License. *
8  * *
9  * This library is distributed in the hope that it will be useful, *
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
12  * GNU Lesser General Public License for more details. *
13  * *
14  * You should have received a copy of the GNU Lesser General Public License *
15  * along with this library; if not, write to the *
16  * Free Software Foundation, Inc., *
17  * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
18  ******************************************************************************/
19 
20 #include "opc_tcp_processor.h"
21 
23 
24 #include <opc/ua/protocol/utils.h>
30 
31 #include <array>
32 #include <boost/asio.hpp>
33 #include <future>
34 #include <iostream>
35 #include <set>
36 
37 
38 
39 namespace
40 {
41 
42 using namespace OpcUa;
43 using namespace OpcUa::Binary;
44 using namespace OpcUa;
45 
46 using namespace boost::asio;
47 using namespace boost::asio::ip;
48 
49 
50 class OpcTcpConnection;
51 
52 class OpcTcpServer : public OpcUa::Server::AsyncOpcTcp
53 {
54 public:
55  DEFINE_CLASS_POINTERS(OpcTcpServer)
56 
57 public:
58  OpcTcpServer(const AsyncOpcTcp::Parameters & params, Services::SharedPtr server, boost::asio::io_service & ioService, const Common::Logger::SharedPtr & logger);
59 
60  virtual void Listen() override;
61  virtual void Shutdown() override;
62 
63 private:
64  void Accept();
65 
66 private:// OpcTcpClient interface;
67  friend class OpcTcpConnection;
68  void RemoveClient(std::shared_ptr<OpcTcpConnection> client);
69 
70 private:
71  Parameters Params;
72  Services::SharedPtr Server;
73  Common::Logger::SharedPtr Logger;
74  std::mutex Mutex;
75  std::set<std::shared_ptr<OpcTcpConnection>> Clients;
76 
77  tcp::socket socket;
78  tcp::acceptor acceptor;
79 };
80 
81 
82 class OpcTcpConnection : public std::enable_shared_from_this<OpcTcpConnection>, public OpcUa::OutputChannel
83 {
84 public:
85  DEFINE_CLASS_POINTERS(OpcTcpConnection)
86 
87 public:
88  // Even if this is a public constructor do not use it - use OpcTcpConnection::create().
89  // This constructor is needed for make_shared() which is needed
90  // to be able to use instances of OpcTcpConnection as
91  // OpcTcpConnection::SharedPtr and OpcUa::OutputChannel::SharedPtr
92  // at the same time.
93  OpcTcpConnection(tcp::socket socket, OpcTcpServer & tcpServer, const Common::Logger::SharedPtr & logger);
94  static SharedPtr create(tcp::socket socket, OpcTcpServer & tcpServer, Services::SharedPtr uaServer, const Common::Logger::SharedPtr & logger);
95  ~OpcTcpConnection();
96 
97  void Start();
98 
99  virtual void Stop()
100  {
101  Socket.close();
102 
103  /* queue a dummy operation to io_service to make sure we do not return
104  * until all existing async io requests of this instance are actually
105  * processed
106  */
107  typedef std::promise<void> Promise;
108  Promise promise;
109 #if BOOST_VERSION < 107000
110  Socket.get_io_service().post(bind(&Promise::set_value, &promise));
111 #else
112  post(Socket.get_executor(), bind(&Promise::set_value, &promise));
113 #endif
114  promise.get_future().wait();
115  }
116 
117 
118 private:
119  void ReadNextData();
120  void ProcessHeader(const boost::system::error_code & error, std::size_t bytes_transferred);
121  void ProcessMessage(OpcUa::Binary::MessageType type, const boost::system::error_code & error, std::size_t bytesTransferred);
122  void GoodBye();
123 
124  std::size_t GetHeaderSize() const;
125 
126 private:
127  virtual void Send(const char * message, std::size_t size);
128  void FillResponseHeader(const RequestHeader & requestHeader, ResponseHeader & responseHeader) const;
129 
130 private:
131  tcp::socket Socket;
132  OpcTcpServer & TcpServer;
133  Server::OpcTcpMessages::SharedPtr MessageProcessor;
135  Common::Logger::SharedPtr Logger;
136  std::vector<char> Buffer;
137 };
138 
139 OpcTcpConnection::OpcTcpConnection(tcp::socket socket, OpcTcpServer & tcpServer, const Common::Logger::SharedPtr & logger)
140  : Socket(std::move(socket))
141  , TcpServer(tcpServer)
142  , OStream(*this)
143  , Logger(logger)
144  , Buffer(8192)
145 {
146 }
147 
148 OpcTcpConnection::SharedPtr OpcTcpConnection::create(tcp::socket socket, OpcTcpServer & tcpServer, Services::SharedPtr uaServer, const Common::Logger::SharedPtr & logger)
149 {
150  SharedPtr result = std::make_shared<OpcTcpConnection>(std::move(socket), tcpServer, logger);
151 
152  // you must not take a shared_ptr in a constructor
153  // to give OpcTcpConnection as a shared_ptr to MessageProcessor
154  // we have to add this helper function
155  result->MessageProcessor = std::make_shared<Server::OpcTcpMessages>(uaServer, result, logger);
156  return result;
157 }
158 
159 OpcTcpConnection::~OpcTcpConnection()
160 {
161 }
162 
163 void OpcTcpConnection::Start()
164 {
165  ReadNextData();
166 }
167 
168 void OpcTcpConnection::ReadNextData()
169 {
170  // do not lose reference to shared instance even if another
171  // async operation decides to call GoodBye()
172  OpcTcpConnection::SharedPtr self = shared_from_this();
173  async_read(Socket, buffer(Buffer), transfer_exactly(GetHeaderSize()),
174  [self](const boost::system::error_code & error, std::size_t bytes_transferred)
175  {
176  try
177  {
178  self->ProcessHeader(error, bytes_transferred);
179  }
180 
181  catch (const std::exception & exc)
182  {
183  LOG_WARN(self->Logger, "opc_tcp_async | failed to process message header: {}", exc.what());
184  }
185  }
186  );
187 }
188 
189 std::size_t OpcTcpConnection::GetHeaderSize() const
190 {
192 }
193 
194 void OpcTcpConnection::ProcessHeader(const boost::system::error_code & error, std::size_t bytes_transferred)
195 {
196  if (error)
197  {
198  LOG_ERROR(Logger, "opc_tcp_async | error receiving message header: {}", error.message());
199  GoodBye();
200  return;
201  }
202 
203  LOG_DEBUG(Logger, "opc_tcp_async | received message header with size: {}", bytes_transferred);
204 
205  OpcUa::InputFromBuffer messageChannel(&Buffer[0], bytes_transferred);
206  IStreamBinary messageStream(messageChannel);
208  messageStream >> header;
209 
210  const std::size_t messageSize = header.Size - GetHeaderSize();
211 
212  LOG_DEBUG(Logger, "opc_tcp_async | received message: Type: {}, ChunkType: {}, Size: {}: DataSize: {}", header.Type, header.Chunk, header.Size, messageSize);
213 
214  // do not lose reference to shared instance even if another
215  // async operation decides to call GoodBye()
216  OpcTcpConnection::SharedPtr self = shared_from_this();
217  async_read(Socket, buffer(Buffer), transfer_exactly(messageSize),
218  [self, header](const boost::system::error_code & error, std::size_t bytesTransferred)
219  {
220  self->ProcessMessage(header.Type, error, bytesTransferred);
221  }
222  );
223 
224 }
225 
226 void OpcTcpConnection::ProcessMessage(OpcUa::Binary::MessageType type, const boost::system::error_code & error, std::size_t bytesTransferred)
227 {
228  if (error)
229  {
230  LOG_ERROR(Logger, "opc_tcp_async | error receiving message body: {}", error.message());
231  GoodBye();
232  return;
233  }
234 
235  LOG_TRACE(Logger, "opc_tcp_async | received message: {}", ToHexDump(Buffer, bytesTransferred));
236 
237  // restrict server size code only with current message.
238  OpcUa::InputFromBuffer messageChannel(&Buffer[0], bytesTransferred);
239  IStreamBinary messageStream(messageChannel);
240 
241  bool cont = true;
242 
243  try
244  {
245  cont = MessageProcessor->ProcessMessage(type, messageStream);
246  }
247 
248  catch (const std::exception & exc)
249  {
250  LOG_ERROR(Logger, "opc_tcp_async | failed to process message: {}", exc.what());
251  GoodBye();
252  return;
253  }
254 
255  if (messageChannel.GetRemainSize())
256  {
257  std::cerr << "opc_tcp_async | ERROR!!! Message from client has been processed partially." << std::endl;
258  }
259 
260  if (!cont)
261  {
262  GoodBye();
263  return;
264  }
265 
266  ReadNextData();
267 }
268 
269 
270 void OpcTcpConnection::GoodBye()
271 {
272  // reference to shared instance
273  OpcTcpConnection::SharedPtr self = shared_from_this();
274  TcpServer.RemoveClient(self);
275  LOG_DEBUG(Logger, "opc_tcp_async | good bye");
276 }
277 
278 void OpcTcpConnection::Send(const char * message, std::size_t size)
279 {
280  std::shared_ptr<std::vector<char>> data = std::make_shared<std::vector<char>>(message, message + size);
281 
282  LOG_TRACE(Logger, "opc_tcp_async | send message: {}", ToHexDump(*data));
283 
284  // do not lose reference to shared instance even if another
285  // async operation decides to call GoodBye()
286  OpcTcpConnection::SharedPtr self = shared_from_this();
287  async_write(Socket, buffer(&(*data)[0], data->size()), [self, data](const boost::system::error_code & err, size_t bytes)
288  {
289  if (err)
290  {
291  LOG_ERROR(self->Logger, "opc_tcp_async | failed to send data: {}", err.message());
292  self->GoodBye();
293  return;
294  }
295 
296  LOG_DEBUG(self->Logger, "opc_tcp_async | response sent");
297  });
298 }
299 
300 OpcTcpServer::OpcTcpServer(const AsyncOpcTcp::Parameters & params, Services::SharedPtr server, boost::asio::io_service & ioService, const Common::Logger::SharedPtr & logger)
301  : Params(params)
302  , Server(server)
303  , Logger(logger)
304  , socket(ioService)
305  , acceptor(ioService)
306 {
307  tcp::endpoint ep;
308 
309  if (params.Host.empty())
310  {
311  ep = tcp::endpoint(tcp::v4(), params.Port);
312  }
313 
314  else if (params.Host == "localhost")
315  {
316  ep = tcp::endpoint(ip::address::from_string("127.0.0.1"), params.Port);
317  }
318 
319  else
320  {
321  ep = tcp::endpoint(ip::address::from_string(params.Host), params.Port);
322  }
323 
324  acceptor.open(ep.protocol());
325  acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
326  acceptor.bind(ep);
327 }
328 
329 void OpcTcpServer::Listen()
330 {
331  LOG_DEBUG(Logger, "opc_tcp_async | running server");
332 
333  LOG_DEBUG(Logger, "opc_tcp_async | waiting for client connection at: {}:{}", acceptor.local_endpoint().address(), acceptor.local_endpoint().port());
334  acceptor.listen();
335 
336  Accept();
337 }
338 
340 {
341  LOG_DEBUG(Logger, "opc_tcp_async | shutting down server");
342  acceptor.close();
343 
344  // Actively shutdown OpcTcpConnections to clear open async requests from worker
345  // thread.
346  // Warning: the Clients container may be modified by OpcTcpConnections::GoodBye
347  // when calling Stop() which makes the iterator used in our for loop invalid.
348  // So have a copy of this container to have a stable iterator.
349 
350  // guard copy operation
351  typedef std::set<OpcTcpConnection::SharedPtr> OpcTcpConnectionSet;
352  OpcTcpConnectionSet tmp;
353  {
354  std::unique_lock<std::mutex> lock(Mutex);
355  tmp = OpcTcpConnectionSet(Clients);
356  }
357 
358  // Unlock before client->Stop() because stop will interrupt all pending
359  // async read/write operations, which may then call OpcTcpConnection::GoodBye(),
360  // which needs access to OpcTcpServer::Clients. Otherwise we run into a deadlock
361  // because stop waits for completion of pending operations.
362  for (auto client : tmp)
363  {
364  client->Stop();
365  }
366 
367  // clear possibly remaining Client's
368  {
369  std::unique_lock<std::mutex> lock(Mutex);
370  Clients.clear();
371  }
372 
373  /* queue a dummy operation to io_service to make sure we do not return
374  * until all existing async io requests of this instance are actually
375  * processed
376  */
377  typedef std::promise<void> Promise;
378  Promise promise;
379 #if BOOST_VERSION < 107000
380  acceptor.get_io_service().post(bind(&Promise::set_value, &promise));
381 #else
382  post(acceptor.get_executor(), bind(&Promise::set_value, &promise));
383 #endif
384  promise.get_future().wait();
385 }
386 
387 void OpcTcpServer::Accept()
388 {
389  try
390  {
391  acceptor.async_accept(socket, [this](boost::system::error_code errorCode)
392  {
393  if (!acceptor.is_open())
394  {
395  return;
396  }
397 
398  if (!errorCode)
399  {
400  LOG_DEBUG(Logger, "opc_tcp_async | accepted new client connection");
401  OpcTcpConnection::SharedPtr connection = OpcTcpConnection::create(std::move(socket), *this, Server, Logger);
402  {
403  std::unique_lock<std::mutex> lock(Mutex);
404  Clients.insert(connection);
405  }
406  connection->Start();
407  }
408 
409  else
410  {
411  LOG_WARN(Logger, "opc_tcp_async | error during client connection: {}", errorCode.message());
412  }
413 
414  Accept();
415  });
416  }
417 
418  catch (const std::exception & exc)
419  {
420  LOG_WARN(Logger, "opc_tcp_async | error accepting client connection: {}", exc.what());
421  }
422 }
423 
424 void OpcTcpServer::RemoveClient(OpcTcpConnection::SharedPtr client)
425 {
426  std::unique_lock<std::mutex> lock(Mutex);
427  Clients.erase(client);
428 }
429 
430 } // namespace
431 
432 OpcUa::Server::AsyncOpcTcp::UniquePtr OpcUa::Server::CreateAsyncOpcTcp(const OpcUa::Server::AsyncOpcTcp::Parameters & params, Services::SharedPtr server, boost::asio::io_service & io, const Common::Logger::SharedPtr & logger)
433 {
434  return AsyncOpcTcp::UniquePtr(new OpcTcpServer(params, server, io, logger));
435 }
#define LOG_TRACE(__logger__,...)
Definition: common/logger.h:23
#define LOG_WARN(__logger__,...)
Definition: common/logger.h:26
Addon interface definition GNU LGPL.
Opc Ua computer interface. GNU LGPL.
#define LOG_ERROR(__logger__,...)
Definition: common/logger.h:27
#define LOG_DEBUG(__logger__,...)
Definition: common/logger.h:24
std_msgs::Header * header(M &m)
Definition: client.py:1
fmt::BufferedFile & move(fmt::BufferedFile &f)
Definition: posix.h:432
message
Definition: server.py:50
std::shared_ptr< logger > create(const std::string &logger_name, const sink_ptr &sink)
Definition: spdlog_impl.h:179
OPC UA Address space part. GNU LGPL.
std::string ToHexDump(const char *buf, std::size_t size)
Definition: utils.h:29
#define DEFINE_CLASS_POINTERS(ClassName)
Exception declarations GNU LGPL.
AsyncOpcTcp::UniquePtr CreateAsyncOpcTcp(const AsyncOpcTcp::Parameters &params, Services::SharedPtr server, boost::asio::io_service &io, const Common::Logger::SharedPtr &logger)
Definition: server.py:1
std::size_t RawSize(const T &obj)


ros_opcua_impl_freeopcua
Author(s): Denis Štogl
autogenerated on Tue Jan 19 2021 03:12:07