utils.py
Go to the documentation of this file.
00001 import logging
00002 import os
00003 from concurrent.futures import Future
00004 import functools
00005 import threading
00006 from socket import error as SocketError
00007 
00008 try:
00009     # we prefer to use bundles asyncio version, otherwise fallback to trollius
00010     import asyncio
00011 except ImportError:
00012     import trollius as asyncio
00013 
00014 
00015 from opcua.ua.uaerrors import UaError
00016 
00017 
00018 class ServiceError(UaError):
00019     def __init__(self, code):
00020         super(ServiceError, self).__init__('UA Service Error')
00021         self.code = code
00022 
00023 
00024 class NotEnoughData(UaError):
00025     pass
00026 
00027 
00028 class SocketClosedException(UaError):
00029     pass
00030 
00031 
00032 class Buffer(object):
00033 
00034     """
00035     alternative to io.BytesIO making debug easier
00036     and added a few conveniance methods
00037     """
00038 
00039     def __init__(self, data, start_pos=0, size=-1):
00040         # self.logger = logging.getLogger(__name__)
00041         self._data = data
00042         self._cur_pos = start_pos
00043         if size == -1:
00044             size = len(data) - start_pos
00045         self._size = size
00046 
00047     def __str__(self):
00048         return "Buffer(size:{0}, data:{1})".format(
00049             self._size,
00050             self._data[self._cur_pos:self._cur_pos + self._size])
00051     __repr__ = __str__
00052 
00053     def __len__(self):
00054         return self._size
00055 
00056     def read(self, size):
00057         """
00058         read and pop number of bytes for buffer
00059         """
00060         if size > self._size:
00061             raise NotEnoughData("Not enough data left in buffer, request for {0}, we have {1}".format(size, self))
00062         # self.logger.debug("Request for %s bytes, from %s", size, self)
00063         self._size -= size
00064         pos = self._cur_pos
00065         self._cur_pos += size
00066         data = self._data[pos:self._cur_pos]
00067         # self.logger.debug("Returning: %s ", data)
00068         return data
00069 
00070     def copy(self, size=-1):
00071         """
00072         return a shadow copy, optionnaly only copy 'size' bytes
00073         """
00074         if size == -1 or size > self._size:
00075             size = self._size
00076         return Buffer(self._data, self._cur_pos, size)
00077 
00078     def skip(self, size):
00079         """
00080         skip size bytes in buffer
00081         """
00082         if size > self._size:
00083             raise NotEnoughData("Not enough data left in buffer, request for {0}, we have {1}".format(size, self))
00084         self._size -= size
00085         self._cur_pos += size
00086 
00087 
00088 class SocketWrapper(object):
00089     """
00090     wrapper to make it possible to have same api for
00091     normal sockets, socket from asyncio, StringIO, etc....
00092     """
00093 
00094     def __init__(self, sock):
00095         self.socket = sock
00096 
00097     def read(self, size):
00098         """
00099         Receive up to size bytes from socket
00100         """
00101         data = b''
00102         while size > 0:
00103             try:
00104                 chunk = self.socket.recv(size)
00105             except (OSError, SocketError) as ex:
00106                 raise SocketClosedException("Server socket has closed", ex)
00107             if not chunk:
00108                 raise SocketClosedException("Server socket has closed")
00109             data += chunk
00110             size -= len(chunk)
00111         return data
00112 
00113     def write(self, data):
00114         self.socket.sendall(data)
00115 
00116 
00117 def create_nonce(size=32):
00118     return os.urandom(size)
00119 
00120 
00121 class ThreadLoop(threading.Thread):
00122     """
00123     run an asyncio loop in a thread
00124     """
00125 
00126     def __init__(self):
00127         threading.Thread.__init__(self)
00128         self.logger = logging.getLogger(__name__)
00129         self.loop = None
00130         self._cond = threading.Condition()
00131 
00132     def start(self):
00133         with self._cond:
00134             threading.Thread.start(self)
00135             self._cond.wait()
00136 
00137     def run(self):
00138         self.logger.debug("Starting subscription thread")
00139         self.loop = asyncio.new_event_loop()
00140         asyncio.set_event_loop(self.loop)
00141         with self._cond:
00142             self._cond.notify_all()
00143         self.loop.run_forever()
00144         self.logger.debug("subscription thread ended")
00145 
00146     def create_server(self, proto, hostname, port):
00147         return self.loop.create_server(proto, hostname, port)
00148 
00149     def stop(self):
00150         """
00151         stop subscription loop, thus the subscription thread
00152         """
00153         self.loop.call_soon_threadsafe(self.loop.stop)
00154 
00155     def call_soon(self, callback):
00156         self.loop.call_soon_threadsafe(callback)
00157 
00158     def call_later(self, delay, callback):
00159         """
00160         threadsafe call_later from asyncio
00161         """
00162         p = functools.partial(self.loop.call_later, delay, callback)
00163         self.loop.call_soon_threadsafe(p)
00164 
00165     def _create_task(self, future, coro, cb=None):
00166         #task = self.loop.create_task(coro)
00167         task = asyncio.async(coro, loop=self.loop) 
00168         if cb:
00169             task.add_done_callback(cb)
00170         future.set_result(task)
00171 
00172     def create_task(self, coro, cb=None):
00173         """
00174         threadsafe create_task from asyncio
00175         """
00176         future = Future()
00177         p = functools.partial(self._create_task, future, coro, cb)
00178         self.loop.call_soon_threadsafe(p)
00179         return future.result()
00180 
00181     def run_coro_and_wait(self, coro):
00182         cond = threading.Condition()
00183         def cb(_):
00184             with cond:
00185                 cond.notify_all()
00186         with cond:
00187             task = self.create_task(coro, cb)
00188             cond.wait()
00189         return task.result()
00190 
00191     def _run_until_complete(self, future, coro):
00192         task = self.loop.run_until_complete(coro)
00193         future.set_result(task)
00194 
00195     def run_until_complete(self, coro):
00196         """
00197         threadsafe run_until_completed from asyncio
00198         """
00199         future = Future()
00200         p = functools.partial(self._run_until_complete, future, coro)
00201         self.loop.call_soon_threadsafe(p)
00202         return future.result()
00203 
00204 
00205 


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:24