2 Socket server forwarding request to internal server 9 import trollius
as asyncio
15 logger = logging.getLogger(__name__)
20 def __init__(self, internal_server, hostname, port):
21 self.
logger = logging.getLogger(__name__)
25 self.
loop = internal_server.loop
34 class OPCUAProtocol(asyncio.Protocol):
37 instanciated for every connection 38 defined as internal class since it needs access 39 to the internal server object 40 FIXME: find another solution 48 def connection_made(self, transport):
49 self.
peername = transport.get_extra_info(
'peername')
50 self.logger.info(
'New connection from %s', self.
peername)
53 self.processor.set_policies(self.policies)
55 self.iserver.asyncio_transports.append(transport)
57 def connection_lost(self, ex):
58 self.logger.info(
'Lost connection from %s, %s', self.
peername, ex)
59 self.transport.close()
60 self.iserver.asyncio_transports.remove(self.
transport)
61 self.processor.close()
63 def data_received(self, data):
64 logger.debug(
"received %s bytes from socket", len(data))
66 data = self.
data + data
68 self._process_data(data)
70 def _process_data(self, data):
71 buf = ua.utils.Buffer(data)
74 backup_buf = buf.copy()
76 hdr = ua.Header.from_string(buf)
77 except ua.utils.NotEnoughData:
78 logger.info(
"We did not receive enough data from client, waiting for more")
79 self.
data = backup_buf.read(len(backup_buf))
81 if len(buf) < hdr.body_size:
82 logger.info(
"We did not receive enough data from client, waiting for more")
83 self.
data = backup_buf.read(len(backup_buf))
85 ret = self.processor.process(hdr, buf)
87 logger.info(
"processor returned False, we close connection from %s", self.
peername)
88 self.transport.close()
93 logger.exception(
"Exception raised while parsing message from client, closing")
96 coro = self.loop.create_server(OPCUAProtocol, self.
hostname, self.
port)
97 self.
_server = self.loop.run_coro_and_wait(coro)
100 if self.
port == 0
and len(self._server.sockets) == 1:
103 sockname = self._server.sockets[0].getsockname()
105 self.
port = sockname[1]
106 print(
'Listening on {0}:{1}'.format(self.
hostname, self.
port))
109 self.logger.info(
"Closing asyncio socket server")
110 for transport
in self.iserver.asyncio_transports:
112 self.loop.call_soon(self._server.close)
113 self.loop.run_coro_and_wait(self._server.wait_closed())
def set_policies(self, policies)
def __init__(self, internal_server, hostname, port)