binary_server_asyncio.py
Go to the documentation of this file.
1 """
2 Socket server forwarding request to internal server
3 """
4 import logging
5 try:
6  # we prefer to use bundles asyncio version, otherwise fallback to trollius
7  import asyncio
8 except ImportError:
9  import trollius as asyncio
10 
11 
12 from opcua import ua
13 from opcua.server.uaprocessor import UaProcessor
14 
15 logger = logging.getLogger(__name__)
16 
17 
18 class BinaryServer(object):
19 
20  def __init__(self, internal_server, hostname, port):
21  self.logger = logging.getLogger(__name__)
22  self.hostname = hostname
23  self.port = port
24  self.iserver = internal_server
25  self.loop = internal_server.loop
26  self._server = None
27  self._policies = []
28 
29  def set_policies(self, policies):
30  self._policies = policies
31 
32  def start(self):
33 
34  class OPCUAProtocol(asyncio.Protocol):
35 
36  """
37  instanciated for every connection
38  defined as internal class since it needs access
39  to the internal server object
40  FIXME: find another solution
41  """
42 
43  iserver = self.iserver
44  loop = self.loop
45  logger = self.logger
46  policies = self._policies
47 
48  def connection_made(self, transport):
49  self.peername = transport.get_extra_info('peername')
50  self.logger.info('New connection from %s', self.peername)
51  self.transport = transport
53  self.processor.set_policies(self.policies)
54  self.data = b""
55  self.iserver.asyncio_transports.append(transport)
56 
57  def connection_lost(self, ex):
58  self.logger.info('Lost connection from %s, %s', self.peername, ex)
59  self.transport.close()
60  self.iserver.asyncio_transports.remove(self.transport)
61  self.processor.close()
62 
63  def data_received(self, data):
64  logger.debug("received %s bytes from socket", len(data))
65  if self.data:
66  data = self.data + data
67  self.data = b""
68  self._process_data(data)
69 
70  def _process_data(self, data):
71  buf = ua.utils.Buffer(data)
72  while True:
73  try:
74  backup_buf = buf.copy()
75  try:
76  hdr = ua.Header.from_string(buf)
77  except ua.utils.NotEnoughData:
78  logger.info("We did not receive enough data from client, waiting for more")
79  self.data = backup_buf.read(len(backup_buf))
80  return
81  if len(buf) < hdr.body_size:
82  logger.info("We did not receive enough data from client, waiting for more")
83  self.data = backup_buf.read(len(backup_buf))
84  return
85  ret = self.processor.process(hdr, buf)
86  if not ret:
87  logger.info("processor returned False, we close connection from %s", self.peername)
88  self.transport.close()
89  return
90  if len(buf) == 0:
91  return
92  except Exception:
93  logger.exception("Exception raised while parsing message from client, closing")
94  return
95 
96  coro = self.loop.create_server(OPCUAProtocol, self.hostname, self.port)
97  self._server = self.loop.run_coro_and_wait(coro)
98  # get the port and the hostname from the created server socket
99  # only relevant for dynamic port asignment (when self.port == 0)
100  if self.port == 0 and len(self._server.sockets) == 1:
101  # will work for AF_INET and AF_INET6 socket names
102  # these are to only families supported by the create_server call
103  sockname = self._server.sockets[0].getsockname()
104  self.hostname = sockname[0]
105  self.port = sockname[1]
106  print('Listening on {0}:{1}'.format(self.hostname, self.port))
107 
108  def stop(self):
109  self.logger.info("Closing asyncio socket server")
110  for transport in self.iserver.asyncio_transports:
111  transport.close()
112  self.loop.call_soon(self._server.close)
113  self.loop.run_coro_and_wait(self._server.wait_closed())
def __init__(self, internal_server, hostname, port)


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:43