00001 import logging
00002 import os
00003 from concurrent.futures import Future
00004 import functools
00005 import threading
00006 from socket import error as SocketError
00007
00008 try:
00009
00010 import asyncio
00011 except ImportError:
00012 import trollius as asyncio
00013
00014
00015 from opcua.ua.uaerrors import UaError
00016
00017
00018 class ServiceError(UaError):
00019 def __init__(self, code):
00020 super(ServiceError, self).__init__('UA Service Error')
00021 self.code = code
00022
00023
00024 class NotEnoughData(UaError):
00025 pass
00026
00027
00028 class SocketClosedException(UaError):
00029 pass
00030
00031
00032 class Buffer(object):
00033
00034 """
00035 alternative to io.BytesIO making debug easier
00036 and added a few conveniance methods
00037 """
00038
00039 def __init__(self, data, start_pos=0, size=-1):
00040
00041 self._data = data
00042 self._cur_pos = start_pos
00043 if size == -1:
00044 size = len(data) - start_pos
00045 self._size = size
00046
00047 def __str__(self):
00048 return "Buffer(size:{0}, data:{1})".format(
00049 self._size,
00050 self._data[self._cur_pos:self._cur_pos + self._size])
00051 __repr__ = __str__
00052
00053 def __len__(self):
00054 return self._size
00055
00056 def read(self, size):
00057 """
00058 read and pop number of bytes for buffer
00059 """
00060 if size > self._size:
00061 raise NotEnoughData("Not enough data left in buffer, request for {0}, we have {1}".format(size, self))
00062
00063 self._size -= size
00064 pos = self._cur_pos
00065 self._cur_pos += size
00066 data = self._data[pos:self._cur_pos]
00067
00068 return data
00069
00070 def copy(self, size=-1):
00071 """
00072 return a shadow copy, optionnaly only copy 'size' bytes
00073 """
00074 if size == -1 or size > self._size:
00075 size = self._size
00076 return Buffer(self._data, self._cur_pos, size)
00077
00078 def skip(self, size):
00079 """
00080 skip size bytes in buffer
00081 """
00082 if size > self._size:
00083 raise NotEnoughData("Not enough data left in buffer, request for {0}, we have {1}".format(size, self))
00084 self._size -= size
00085 self._cur_pos += size
00086
00087
00088 class SocketWrapper(object):
00089 """
00090 wrapper to make it possible to have same api for
00091 normal sockets, socket from asyncio, StringIO, etc....
00092 """
00093
00094 def __init__(self, sock):
00095 self.socket = sock
00096
00097 def read(self, size):
00098 """
00099 Receive up to size bytes from socket
00100 """
00101 data = b''
00102 while size > 0:
00103 try:
00104 chunk = self.socket.recv(size)
00105 except (OSError, SocketError) as ex:
00106 raise SocketClosedException("Server socket has closed", ex)
00107 if not chunk:
00108 raise SocketClosedException("Server socket has closed")
00109 data += chunk
00110 size -= len(chunk)
00111 return data
00112
00113 def write(self, data):
00114 self.socket.sendall(data)
00115
00116
00117 def create_nonce(size=32):
00118 return os.urandom(size)
00119
00120
00121 class ThreadLoop(threading.Thread):
00122 """
00123 run an asyncio loop in a thread
00124 """
00125
00126 def __init__(self):
00127 threading.Thread.__init__(self)
00128 self.logger = logging.getLogger(__name__)
00129 self.loop = None
00130 self._cond = threading.Condition()
00131
00132 def start(self):
00133 with self._cond:
00134 threading.Thread.start(self)
00135 self._cond.wait()
00136
00137 def run(self):
00138 self.logger.debug("Starting subscription thread")
00139 self.loop = asyncio.new_event_loop()
00140 asyncio.set_event_loop(self.loop)
00141 with self._cond:
00142 self._cond.notify_all()
00143 self.loop.run_forever()
00144 self.logger.debug("subscription thread ended")
00145
00146 def create_server(self, proto, hostname, port):
00147 return self.loop.create_server(proto, hostname, port)
00148
00149 def stop(self):
00150 """
00151 stop subscription loop, thus the subscription thread
00152 """
00153 self.loop.call_soon_threadsafe(self.loop.stop)
00154
00155 def call_soon(self, callback):
00156 self.loop.call_soon_threadsafe(callback)
00157
00158 def call_later(self, delay, callback):
00159 """
00160 threadsafe call_later from asyncio
00161 """
00162 p = functools.partial(self.loop.call_later, delay, callback)
00163 self.loop.call_soon_threadsafe(p)
00164
00165 def _create_task(self, future, coro, cb=None):
00166
00167 task = asyncio.async(coro, loop=self.loop)
00168 if cb:
00169 task.add_done_callback(cb)
00170 future.set_result(task)
00171
00172 def create_task(self, coro, cb=None):
00173 """
00174 threadsafe create_task from asyncio
00175 """
00176 future = Future()
00177 p = functools.partial(self._create_task, future, coro, cb)
00178 self.loop.call_soon_threadsafe(p)
00179 return future.result()
00180
00181 def run_coro_and_wait(self, coro):
00182 cond = threading.Condition()
00183 def cb(_):
00184 with cond:
00185 cond.notify_all()
00186 with cond:
00187 task = self.create_task(coro, cb)
00188 cond.wait()
00189 return task.result()
00190
00191 def _run_until_complete(self, future, coro):
00192 task = self.loop.run_until_complete(coro)
00193 future.set_result(task)
00194
00195 def run_until_complete(self, coro):
00196 """
00197 threadsafe run_until_completed from asyncio
00198 """
00199 future = Future()
00200 p = functools.partial(self._run_until_complete, future, coro)
00201 self.loop.call_soon_threadsafe(p)
00202 return future.result()
00203
00204
00205