utils.py
Go to the documentation of this file.
1 import logging
2 import os
3 from concurrent.futures import Future
4 import functools
5 import threading
6 from socket import error as SocketError
7 
8 try:
9  # we prefer to use bundles asyncio version, otherwise fallback to trollius
10  import asyncio
11 except ImportError:
12  import trollius as asyncio
13 
14 
15 from opcua.ua.uaerrors import UaError
16 
17 
18 class ServiceError(UaError):
19  def __init__(self, code):
20  super(ServiceError, self).__init__('UA Service Error')
21  self.code = code
22 
23 
24 class NotEnoughData(UaError):
25  pass
26 
27 
28 class SocketClosedException(UaError):
29  pass
30 
31 
32 class Buffer(object):
33 
34  """
35  alternative to io.BytesIO making debug easier
36  and added a few conveniance methods
37  """
38 
39  def __init__(self, data, start_pos=0, size=-1):
40  # self.logger = logging.getLogger(__name__)
41  self._data = data
42  self._cur_pos = start_pos
43  if size == -1:
44  size = len(data) - start_pos
45  self._size = size
46 
47  def __str__(self):
48  return "Buffer(size:{0}, data:{1})".format(
49  self._size,
50  self._data[self._cur_pos:self._cur_pos + self._size])
51  __repr__ = __str__
52 
53  def __len__(self):
54  return self._size
55 
56  def read(self, size):
57  """
58  read and pop number of bytes for buffer
59  """
60  if size > self._size:
61  raise NotEnoughData("Not enough data left in buffer, request for {0}, we have {1}".format(size, self))
62  # self.logger.debug("Request for %s bytes, from %s", size, self)
63  self._size -= size
64  pos = self._cur_pos
65  self._cur_pos += size
66  data = self._data[pos:self._cur_pos]
67  # self.logger.debug("Returning: %s ", data)
68  return data
69 
70  def copy(self, size=-1):
71  """
72  return a shadow copy, optionnaly only copy 'size' bytes
73  """
74  if size == -1 or size > self._size:
75  size = self._size
76  return Buffer(self._data, self._cur_pos, size)
77 
78  def skip(self, size):
79  """
80  skip size bytes in buffer
81  """
82  if size > self._size:
83  raise NotEnoughData("Not enough data left in buffer, request for {0}, we have {1}".format(size, self))
84  self._size -= size
85  self._cur_pos += size
86 
87 
88 class SocketWrapper(object):
89  """
90  wrapper to make it possible to have same api for
91  normal sockets, socket from asyncio, StringIO, etc....
92  """
93 
94  def __init__(self, sock):
95  self.socket = sock
96 
97  def read(self, size):
98  """
99  Receive up to size bytes from socket
100  """
101  data = b''
102  while size > 0:
103  try:
104  chunk = self.socket.recv(size)
105  except (OSError, SocketError) as ex:
106  raise SocketClosedException("Server socket has closed", ex)
107  if not chunk:
108  raise SocketClosedException("Server socket has closed")
109  data += chunk
110  size -= len(chunk)
111  return data
112 
113  def write(self, data):
114  self.socket.sendall(data)
115 
116 
117 def create_nonce(size=32):
118  return os.urandom(size)
119 
120 
121 class ThreadLoop(threading.Thread):
122  """
123  run an asyncio loop in a thread
124  """
125 
126  def __init__(self):
127  threading.Thread.__init__(self)
128  self.logger = logging.getLogger(__name__)
129  self.loop = None
130  self._cond = threading.Condition()
131 
132  def start(self):
133  with self._cond:
134  threading.Thread.start(self)
135  self._cond.wait()
136 
137  def run(self):
138  self.logger.debug("Starting subscription thread")
139  self.loop = asyncio.new_event_loop()
140  asyncio.set_event_loop(self.loop)
141  with self._cond:
142  self._cond.notify_all()
143  self.loop.run_forever()
144  self.logger.debug("subscription thread ended")
145 
146  def create_server(self, proto, hostname, port):
147  return self.loop.create_server(proto, hostname, port)
148 
149  def stop(self):
150  """
151  stop subscription loop, thus the subscription thread
152  """
153  self.loop.call_soon_threadsafe(self.loop.stop)
154 
155  def call_soon(self, callback):
156  self.loop.call_soon_threadsafe(callback)
157 
158  def call_later(self, delay, callback):
159  """
160  threadsafe call_later from asyncio
161  """
162  p = functools.partial(self.loop.call_later, delay, callback)
163  self.loop.call_soon_threadsafe(p)
164 
165  def _create_task(self, future, coro, cb=None):
166  #task = self.loop.create_task(coro)
167  task = asyncio.async(coro, loop=self.loop)
168  if cb:
169  task.add_done_callback(cb)
170  future.set_result(task)
171 
172  def create_task(self, coro, cb=None):
173  """
174  threadsafe create_task from asyncio
175  """
176  future = Future()
177  p = functools.partial(self._create_task, future, coro, cb)
178  self.loop.call_soon_threadsafe(p)
179  return future.result()
180 
181  def run_coro_and_wait(self, coro):
182  cond = threading.Condition()
183  def cb(_):
184  with cond:
185  cond.notify_all()
186  with cond:
187  task = self.create_task(coro, cb)
188  cond.wait()
189  return task.result()
190 
191  def _run_until_complete(self, future, coro):
192  task = self.loop.run_until_complete(coro)
193  future.set_result(task)
194 
195  def run_until_complete(self, coro):
196  """
197  threadsafe run_until_completed from asyncio
198  """
199  future = Future()
200  p = functools.partial(self._run_until_complete, future, coro)
201  self.loop.call_soon_threadsafe(p)
202  return future.result()
203 
204 
205 
def call_soon(self, callback)
Definition: utils.py:155
def skip(self, size)
Definition: utils.py:78
def create_server(self, proto, hostname, port)
Definition: utils.py:146
def create_nonce(size=32)
Definition: utils.py:117
def __init__(self, data, start_pos=0, size=-1)
Definition: utils.py:39
def __init__(self, sock)
Definition: utils.py:94
def read(self, size)
Definition: utils.py:56
def run_coro_and_wait(self, coro)
Definition: utils.py:181
def read(self, size)
Definition: utils.py:97
def _create_task(self, future, coro, cb=None)
Definition: utils.py:165
def call_later(self, delay, callback)
Definition: utils.py:158
def __init__(self, code)
Definition: utils.py:19
def _run_until_complete(self, future, coro)
Definition: utils.py:191
def copy(self, size=-1)
Definition: utils.py:70
def create_task(self, coro, cb=None)
Definition: utils.py:172
def run_until_complete(self, coro)
Definition: utils.py:195


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:44