Go to the documentation of this file.00001 """
00002 Socket server forwarding request to internal server
00003 """
00004 import logging
00005 try:
00006
00007 import asyncio
00008 except ImportError:
00009 import trollius as asyncio
00010
00011
00012 from opcua import ua
00013 from opcua.server.uaprocessor import UaProcessor
00014
00015 logger = logging.getLogger(__name__)
00016
00017
00018 class BinaryServer(object):
00019
00020 def __init__(self, internal_server, hostname, port):
00021 self.logger = logging.getLogger(__name__)
00022 self.hostname = hostname
00023 self.port = port
00024 self.iserver = internal_server
00025 self.loop = internal_server.loop
00026 self._server = None
00027 self._policies = []
00028
00029 def set_policies(self, policies):
00030 self._policies = policies
00031
00032 def start(self):
00033
00034 class OPCUAProtocol(asyncio.Protocol):
00035
00036 """
00037 instanciated for every connection
00038 defined as internal class since it needs access
00039 to the internal server object
00040 FIXME: find another solution
00041 """
00042
00043 iserver = self.iserver
00044 loop = self.loop
00045 logger = self.logger
00046 policies = self._policies
00047
00048 def connection_made(self, transport):
00049 self.peername = transport.get_extra_info('peername')
00050 self.logger.info('New connection from %s', self.peername)
00051 self.transport = transport
00052 self.processor = UaProcessor(self.iserver, self.transport)
00053 self.processor.set_policies(self.policies)
00054 self.data = b""
00055 self.iserver.asyncio_transports.append(transport)
00056
00057 def connection_lost(self, ex):
00058 self.logger.info('Lost connection from %s, %s', self.peername, ex)
00059 self.transport.close()
00060 self.iserver.asyncio_transports.remove(self.transport)
00061 self.processor.close()
00062
00063 def data_received(self, data):
00064 logger.debug("received %s bytes from socket", len(data))
00065 if self.data:
00066 data = self.data + data
00067 self.data = b""
00068 self._process_data(data)
00069
00070 def _process_data(self, data):
00071 buf = ua.utils.Buffer(data)
00072 while True:
00073 try:
00074 backup_buf = buf.copy()
00075 try:
00076 hdr = ua.Header.from_string(buf)
00077 except ua.utils.NotEnoughData:
00078 logger.info("We did not receive enough data from client, waiting for more")
00079 self.data = backup_buf.read(len(backup_buf))
00080 return
00081 if len(buf) < hdr.body_size:
00082 logger.info("We did not receive enough data from client, waiting for more")
00083 self.data = backup_buf.read(len(backup_buf))
00084 return
00085 ret = self.processor.process(hdr, buf)
00086 if not ret:
00087 logger.info("processor returned False, we close connection from %s", self.peername)
00088 self.transport.close()
00089 return
00090 if len(buf) == 0:
00091 return
00092 except Exception:
00093 logger.exception("Exception raised while parsing message from client, closing")
00094 return
00095
00096 coro = self.loop.create_server(OPCUAProtocol, self.hostname, self.port)
00097 self._server = self.loop.run_coro_and_wait(coro)
00098
00099
00100 if self.port == 0 and len(self._server.sockets) == 1:
00101
00102
00103 sockname = self._server.sockets[0].getsockname()
00104 self.hostname = sockname[0]
00105 self.port = sockname[1]
00106 print('Listening on {0}:{1}'.format(self.hostname, self.port))
00107
00108 def stop(self):
00109 self.logger.info("Closing asyncio socket server")
00110 for transport in self.iserver.asyncio_transports:
00111 transport.close()
00112 self.loop.call_soon(self._server.close)
00113 self.loop.run_coro_and_wait(self._server.wait_closed())