binary_server_asyncio.py
Go to the documentation of this file.
00001 """
00002 Socket server forwarding request to internal server
00003 """
00004 import logging
00005 try:
00006     # we prefer to use bundles asyncio version, otherwise fallback to trollius
00007     import asyncio
00008 except ImportError:
00009     import trollius as asyncio
00010 
00011 
00012 from opcua import ua
00013 from opcua.server.uaprocessor import UaProcessor
00014 
00015 logger = logging.getLogger(__name__)
00016 
00017 
00018 class BinaryServer(object):
00019 
00020     def __init__(self, internal_server, hostname, port):
00021         self.logger = logging.getLogger(__name__)
00022         self.hostname = hostname
00023         self.port = port
00024         self.iserver = internal_server
00025         self.loop = internal_server.loop
00026         self._server = None
00027         self._policies = []
00028 
00029     def set_policies(self, policies):
00030         self._policies = policies
00031 
00032     def start(self):
00033 
00034         class OPCUAProtocol(asyncio.Protocol):
00035 
00036             """
00037             instanciated for every connection
00038             defined as internal class since it needs access
00039             to the internal server object
00040             FIXME: find another solution
00041             """
00042 
00043             iserver = self.iserver
00044             loop = self.loop
00045             logger = self.logger
00046             policies = self._policies
00047 
00048             def connection_made(self, transport):
00049                 self.peername = transport.get_extra_info('peername')
00050                 self.logger.info('New connection from %s', self.peername)
00051                 self.transport = transport
00052                 self.processor = UaProcessor(self.iserver, self.transport)
00053                 self.processor.set_policies(self.policies)
00054                 self.data = b""
00055                 self.iserver.asyncio_transports.append(transport)
00056 
00057             def connection_lost(self, ex):
00058                 self.logger.info('Lost connection from %s, %s', self.peername, ex)
00059                 self.transport.close()
00060                 self.iserver.asyncio_transports.remove(self.transport)
00061                 self.processor.close()
00062 
00063             def data_received(self, data):
00064                 logger.debug("received %s bytes from socket", len(data))
00065                 if self.data:
00066                     data = self.data + data
00067                     self.data = b""
00068                 self._process_data(data)
00069 
00070             def _process_data(self, data):
00071                 buf = ua.utils.Buffer(data)
00072                 while True:
00073                     try:
00074                         backup_buf = buf.copy()
00075                         try:
00076                             hdr = ua.Header.from_string(buf)
00077                         except ua.utils.NotEnoughData:
00078                             logger.info("We did not receive enough data from client, waiting for more")
00079                             self.data = backup_buf.read(len(backup_buf))
00080                             return
00081                         if len(buf) < hdr.body_size:
00082                             logger.info("We did not receive enough data from client, waiting for more")
00083                             self.data = backup_buf.read(len(backup_buf))
00084                             return
00085                         ret = self.processor.process(hdr, buf)
00086                         if not ret:
00087                             logger.info("processor returned False, we close connection from %s", self.peername)
00088                             self.transport.close()
00089                             return
00090                         if len(buf) == 0:
00091                             return
00092                     except Exception:
00093                         logger.exception("Exception raised while parsing message from client, closing")
00094                         return
00095 
00096         coro = self.loop.create_server(OPCUAProtocol, self.hostname, self.port)
00097         self._server = self.loop.run_coro_and_wait(coro)
00098         # get the port and the hostname from the created server socket
00099         # only relevant for dynamic port asignment (when self.port == 0)
00100         if self.port == 0 and len(self._server.sockets) == 1:
00101             # will work for AF_INET and AF_INET6 socket names
00102             # these are to only families supported by the create_server call
00103             sockname = self._server.sockets[0].getsockname()
00104             self.hostname = sockname[0]
00105             self.port = sockname[1]
00106         print('Listening on {0}:{1}'.format(self.hostname, self.port))
00107 
00108     def stop(self):
00109         self.logger.info("Closing asyncio socket server")
00110         for transport in self.iserver.asyncio_transports:
00111             transport.close()
00112         self.loop.call_soon(self._server.close)
00113         self.loop.run_coro_and_wait(self._server.wait_closed())


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:23