ua_client.py
Go to the documentation of this file.
1 """
2 Low level binary client
3 """
4 
5 import logging
6 import socket
7 from threading import Thread, Lock
8 from concurrent.futures import Future
9 from functools import partial
10 
11 from opcua import ua
12 from opcua.common import utils
13 from opcua.ua.uaerrors import UaError, BadTimeout, BadNoSubscription, BadSessionClosed
14 
15 
16 class UASocketClient(object):
17  """
18  handle socket connection and send ua messages
19  timeout is the timeout used while waiting for an ua answer from server
20  """
21  def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
22  self.logger = logging.getLogger(__name__ + ".Socket")
23  self._thread = None
24  self._lock = Lock()
25  self.timeout = timeout
26  self._socket = None
27  self._do_stop = False
29  self._request_id = 0
30  self._request_handle = 0
31  self._callbackmap = {}
32  self._connection = ua.SecureConnection(security_policy)
33 
34  def start(self):
35  """
36  Start receiving thread.
37  this is called automatically in connect and
38  should not be necessary to call directly
39  """
40  self._thread = Thread(target=self._run)
41  self._thread.start()
42 
43  def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
44  """
45  send request to server, lower-level method
46  timeout is the timeout written in ua header
47  returns future
48  """
49  with self._lock:
50  request.RequestHeader = self._create_request_header(timeout)
51  self.logger.debug("Sending: %s", request)
52  try:
53  binreq = request.to_binary()
54  except:
55  # reset reqeust handle if any error
56  # see self._create_request_header
57  self._request_handle -= 1
58  raise
59  self._request_id += 1
60  future = Future()
61  if callback:
62  future.add_done_callback(callback)
63  self._callbackmap[self._request_id] = future
64  msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
65  self._socket.write(msg)
66  return future
67 
68  def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
69  """
70  send request to server.
71  timeout is the timeout written in ua header
72  returns response object if no callback is provided
73  """
74  future = self._send_request(request, callback, timeout, message_type)
75  if not callback:
76  data = future.result(self.timeout)
77  self.check_answer(data, " in response to " + request.__class__.__name__)
78  return data
79 
80  def check_answer(self, data, context):
81  data = data.copy()
82  typeid = ua.NodeId.from_binary(data)
83  if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
84  self.logger.warning("ServiceFault from server received %s", context)
85  hdr = ua.ResponseHeader.from_binary(data)
86  hdr.ServiceResult.check()
87  return False
88  return True
89 
90  def _run(self):
91  self.logger.info("Thread started")
92  while not self._do_stop:
93  try:
94  self._receive()
95  except ua.utils.SocketClosedException:
96  self.logger.info("Socket has closed connection")
97  break
98  except UaError:
99  self.logger.exception("Protocol Error")
100  self.logger.info("Thread ended")
101 
102  def _receive(self):
103  msg = self._connection.receive_from_socket(self._socket)
104  if msg is None:
105  return
106  elif isinstance(msg, ua.Message):
107  self._call_callback(msg.request_id(), msg.body())
108  elif isinstance(msg, ua.Acknowledge):
109  self._call_callback(0, msg)
110  elif isinstance(msg, ua.ErrorMessage):
111  self.logger.warning("Received an error: %s", msg)
112  else:
113  raise ua.UaError("Unsupported message type: %s", msg)
114 
115  def _call_callback(self, request_id, body):
116  with self._lock:
117  future = self._callbackmap.pop(request_id, None)
118  if future is None:
119  raise ua.UaError("No future object found for request: {0}, callbacks in list are {1}".format(request_id, self._callbackmap.keys()))
120  future.set_result(body)
121 
122  def _create_request_header(self, timeout=1000):
123  hdr = ua.RequestHeader()
124  hdr.AuthenticationToken = self.authentication_token
125  self._request_handle += 1
126  hdr.RequestHandle = self._request_handle
127  hdr.TimeoutHint = timeout
128  return hdr
129 
130  def connect_socket(self, host, port):
131  """
132  connect to server socket and start receiving thread
133  """
134  self.logger.info("opening connection")
135  sock = socket.create_connection((host, port))
136  sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # nodelay ncessary to avoid packing in one frame, some servers do not like it
137  self._socket = utils.SocketWrapper(sock)
138  self.start()
139 
140  def disconnect_socket(self):
141  self.logger.info("stop request")
142  self._do_stop = True
143  self._socket.socket.shutdown(socket.SHUT_RDWR)
144  self._socket.socket.close()
145 
146  def send_hello(self, url):
147  hello = ua.Hello()
148  hello.EndpointUrl = url
149  future = Future()
150  with self._lock:
151  self._callbackmap[0] = future
152  binmsg = self._connection.tcp_to_binary(ua.MessageType.Hello, hello)
153  self._socket.write(binmsg)
154  ack = future.result(self.timeout)
155  return ack
156 
157  def open_secure_channel(self, params):
158  self.logger.info("open_secure_channel")
159  request = ua.OpenSecureChannelRequest()
160  request.Parameters = params
161  future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
162 
163  # FIXME: we have a race condition here
164  # we can get a packet with the new token id before we reach to store it..
165  response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
166  response.ResponseHeader.ServiceResult.check()
167  self._connection.set_channel(response.Parameters)
168  return response.Parameters
169 
171  """
172  close secure channel. It seems to trigger a shutdown of socket
173  in most servers, so be prepare to reconnect.
174  OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
175  """
176  self.logger.info("close_secure_channel")
177  request = ua.CloseSecureChannelRequest()
178  future = self._send_request(request, message_type=ua.MessageType.SecureClose)
179  with self._lock:
180  # don't expect any more answers
181  future.cancel()
182  self._callbackmap.clear()
183 
184  # some servers send a response here, most do not ... so we ignore
185 
186 
187 class UaClient(object):
188 
189  """
190  low level OPC-UA client.
191 
192  It implements (almost) all methods defined in opcua spec
193  taking in argument the structures defined in opcua spec.
194 
195  In this Python implementation most of the structures are defined in
196  uaprotocol_auto.py and uaprotocol_hand.py available under opcua.ua
197  """
198 
199  def __init__(self, timeout=1):
200  self.logger = logging.getLogger(__name__)
201  # _publishcallbacks should be accessed in recv thread only
203  self._timeout = timeout
204  self._uasocket = None
206 
207  def set_security(self, policy):
208  self._security_policy = policy
209 
210  def connect_socket(self, host, port):
211  """
212  connect to server socket and start receiving thread
213  """
214  self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy)
215  return self._uasocket.connect_socket(host, port)
216 
217  def disconnect_socket(self):
218  return self._uasocket.disconnect_socket()
219 
220  def send_hello(self, url):
221  return self._uasocket.send_hello(url)
222 
223  def open_secure_channel(self, params):
224  return self._uasocket.open_secure_channel(params)
225 
227  """
228  close secure channel. It seems to trigger a shutdown of socket
229  in most servers, so be prepare to reconnect
230  """
231  return self._uasocket.close_secure_channel()
232 
233  def create_session(self, parameters):
234  self.logger.info("create_session")
235  request = ua.CreateSessionRequest()
236  request.Parameters = parameters
237  data = self._uasocket.send_request(request)
238  response = ua.CreateSessionResponse.from_binary(data)
239  self.logger.debug(response)
240  response.ResponseHeader.ServiceResult.check()
241  self._uasocket.authentication_token = response.Parameters.AuthenticationToken
242  return response.Parameters
243 
244  def activate_session(self, parameters):
245  self.logger.info("activate_session")
246  request = ua.ActivateSessionRequest()
247  request.Parameters = parameters
248  data = self._uasocket.send_request(request)
249  response = ua.ActivateSessionResponse.from_binary(data)
250  self.logger.debug(response)
251  response.ResponseHeader.ServiceResult.check()
252  return response.Parameters
253 
254  def close_session(self, deletesubscriptions):
255  self.logger.info("close_session")
256  request = ua.CloseSessionRequest()
257  request.DeleteSubscriptions = deletesubscriptions
258  data = self._uasocket.send_request(request)
259  response = ua.CloseSessionResponse.from_binary(data)
260  try:
261  response.ResponseHeader.ServiceResult.check()
262  except BadSessionClosed:
263  # Problem: closing the session with open publish requests leads to BadSessionClosed responses
264  # we can just ignore it therefore.
265  # Alternatively we could make sure that there are no publish requests in flight when
266  # closing the session.
267  pass
268 
269  def browse(self, parameters):
270  self.logger.info("browse")
271  request = ua.BrowseRequest()
272  request.Parameters = parameters
273  data = self._uasocket.send_request(request)
274  response = ua.BrowseResponse.from_binary(data)
275  self.logger.debug(response)
276  response.ResponseHeader.ServiceResult.check()
277  return response.Results
278 
279  def read(self, parameters):
280  self.logger.info("read")
281  request = ua.ReadRequest()
282  request.Parameters = parameters
283  data = self._uasocket.send_request(request)
284  response = ua.ReadResponse.from_binary(data)
285  self.logger.debug(response)
286  response.ResponseHeader.ServiceResult.check()
287  # cast to Enum attributes that need to
288  for idx, rv in enumerate(parameters.NodesToRead):
289  if rv.AttributeId == ua.AttributeIds.NodeClass:
290  dv = response.Results[idx]
291  if dv.StatusCode.is_good():
292  dv.Value.Value = ua.NodeClass(dv.Value.Value)
293  elif rv.AttributeId == ua.AttributeIds.ValueRank:
294  dv = response.Results[idx]
295  if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
296  dv.Value.Value = ua.ValueRank(dv.Value.Value)
297  return response.Results
298 
299  def write(self, params):
300  self.logger.info("read")
301  request = ua.WriteRequest()
302  request.Parameters = params
303  data = self._uasocket.send_request(request)
304  response = ua.WriteResponse.from_binary(data)
305  self.logger.debug(response)
306  response.ResponseHeader.ServiceResult.check()
307  return response.Results
308 
309  def get_endpoints(self, params):
310  self.logger.info("get_endpoint")
311  request = ua.GetEndpointsRequest()
312  request.Parameters = params
313  data = self._uasocket.send_request(request)
314  response = ua.GetEndpointsResponse.from_binary(data)
315  self.logger.debug(response)
316  response.ResponseHeader.ServiceResult.check()
317  return response.Endpoints
318 
319  def find_servers(self, params):
320  self.logger.info("find_servers")
321  request = ua.FindServersRequest()
322  request.Parameters = params
323  data = self._uasocket.send_request(request)
324  response = ua.FindServersResponse.from_binary(data)
325  self.logger.debug(response)
326  response.ResponseHeader.ServiceResult.check()
327  return response.Servers
328 
329  def find_servers_on_network(self, params):
330  self.logger.info("find_servers_on_network")
332  request.Parameters = params
333  data = self._uasocket.send_request(request)
334  response = ua.FindServersOnNetworkResponse.from_binary(data)
335  self.logger.debug(response)
336  response.ResponseHeader.ServiceResult.check()
337  return response.Parameters
338 
339  def register_server(self, registered_server):
340  self.logger.info("register_server")
341  request = ua.RegisterServerRequest()
342  request.Server = registered_server
343  data = self._uasocket.send_request(request)
344  response = ua.RegisterServerResponse.from_binary(data)
345  self.logger.debug(response)
346  response.ResponseHeader.ServiceResult.check()
347  # nothing to return for this service
348 
349  def register_server2(self, params):
350  self.logger.info("register_server2")
351  request = ua.RegisterServer2Request()
352  request.Parameters = params
353  data = self._uasocket.send_request(request)
354  response = ua.RegisterServer2Response.from_binary(data)
355  self.logger.debug(response)
356  response.ResponseHeader.ServiceResult.check()
357  return response.ConfigurationResults
358 
359  def translate_browsepaths_to_nodeids(self, browsepaths):
360  self.logger.info("translate_browsepath_to_nodeid")
362  request.Parameters.BrowsePaths = browsepaths
363  data = self._uasocket.send_request(request)
364  response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
365  self.logger.debug(response)
366  response.ResponseHeader.ServiceResult.check()
367  return response.Results
368 
369  def create_subscription(self, params, callback):
370  self.logger.info("create_subscription")
371  request = ua.CreateSubscriptionRequest()
372  request.Parameters = params
373  resp_fut = Future()
374  mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
375  self._uasocket.send_request(request, mycallbak)
376  return resp_fut.result(self._timeout)
377 
378  def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
379  self.logger.info("_create_subscription_callback")
380  data = data_fut.result()
381  response = ua.CreateSubscriptionResponse.from_binary(data)
382  self.logger.debug(response)
383  response.ResponseHeader.ServiceResult.check()
384  self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
385  resp_fut.set_result(response.Parameters)
386 
387  def delete_subscriptions(self, subscriptionids):
388  self.logger.info("delete_subscription")
390  request.Parameters.SubscriptionIds = subscriptionids
391  resp_fut = Future()
392  mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
393  self._uasocket.send_request(request, mycallbak)
394  return resp_fut.result(self._timeout)
395 
396  def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
397  self.logger.info("_delete_subscriptions_callback")
398  data = data_fut.result()
399  response = ua.DeleteSubscriptionsResponse.from_binary(data)
400  self.logger.debug(response)
401  response.ResponseHeader.ServiceResult.check()
402  for sid in subscriptionids:
403  self._publishcallbacks.pop(sid)
404  resp_fut.set_result(response.Results)
405 
406  def publish(self, acks=None):
407  self.logger.info("publish")
408  if acks is None:
409  acks = []
410  request = ua.PublishRequest()
411  request.Parameters.SubscriptionAcknowledgements = acks
412  # timeout could be set to 0 (= no timeout) but some servers do not support it
413  self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8)) # 250 days
414 
415  def _call_publish_callback(self, future):
416  self.logger.info("call_publish_callback")
417  data = future.result()
418 
419  # check if answer looks ok
420  try:
421  self._uasocket.check_answer(data, "while waiting for publish response")
422  except BadTimeout: # Spec Part 4, 7.28
423  self.publish()
424  return
425  except BadNoSubscription: # Spec Part 5, 13.8.1
426  # BadNoSubscription is expected after deleting the last subscription.
427  #
428  # We should therefore also check for len(self._publishcallbacks) == 0, but
429  # this gets us into trouble if a Publish response arrives before the
430  # DeleteSubscription response.
431  #
432  # We could remove the callback already when sending the DeleteSubscription request,
433  # but there are some legitimate reasons to keep them around, such as when the server
434  # responds with "BadTimeout" and we should try again later instead of just removing
435  # the subscription client-side.
436  #
437  # There are a variety of ways to act correctly, but the most practical solution seems
438  # to be to just ignore any BadNoSubscription responses.
439  self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
440  return
441 
442  # parse publish response
443  try:
444  response = ua.PublishResponse.from_binary(data)
445  self.logger.debug(response)
446  except Exception:
447  # INFO: catching the exception here might be obsolete because we already
448  # catch BadTimeout above. However, it's not really clear what this code
449  # does so it stays in, doesn't seem to hurt.
450  self.logger.exception("Error parsing notificatipn from server")
451  self.publish([]) #send publish request ot server so he does stop sending notifications
452  return
453 
454  # look for callback
455  try:
456  callback = self._publishcallbacks[response.Parameters.SubscriptionId]
457  except KeyError:
458  self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
459  return
460 
461  # do callback
462  try:
463  callback(response.Parameters)
464  except Exception: # we call client code, catch everything!
465  self.logger.exception("Exception while calling user callback: %s")
466 
467  def create_monitored_items(self, params):
468  self.logger.info("create_monitored_items")
470  request.Parameters = params
471  data = self._uasocket.send_request(request)
472  response = ua.CreateMonitoredItemsResponse.from_binary(data)
473  self.logger.debug(response)
474  response.ResponseHeader.ServiceResult.check()
475  return response.Results
476 
477  def delete_monitored_items(self, params):
478  self.logger.info("delete_monitored_items")
480  request.Parameters = params
481  data = self._uasocket.send_request(request)
482  response = ua.DeleteMonitoredItemsResponse.from_binary(data)
483  self.logger.debug(response)
484  response.ResponseHeader.ServiceResult.check()
485  return response.Results
486 
487  def add_nodes(self, nodestoadd):
488  self.logger.info("add_nodes")
489  request = ua.AddNodesRequest()
490  request.Parameters.NodesToAdd = nodestoadd
491  data = self._uasocket.send_request(request)
492  response = ua.AddNodesResponse.from_binary(data)
493  self.logger.debug(response)
494  response.ResponseHeader.ServiceResult.check()
495  return response.Results
496 
497  def delete_nodes(self, params):
498  self.logger.info("delete_nodes")
499  request = ua.DeleteNodesRequest()
500  request.Parameters = params
501  data = self._uasocket.send_request(request)
502  response = ua.DeleteNodesResponse.from_binary(data)
503  self.logger.debug(response)
504  response.ResponseHeader.ServiceResult.check()
505  return response.Results
506 
507  def call(self, methodstocall):
508  request = ua.CallRequest()
509  request.Parameters.MethodsToCall = methodstocall
510  data = self._uasocket.send_request(request)
511  response = ua.CallResponse.from_binary(data)
512  self.logger.debug(response)
513  response.ResponseHeader.ServiceResult.check()
514  return response.Results
515 
516  def history_read(self, params):
517  self.logger.info("history_read")
518  request = ua.HistoryReadRequest()
519  request.Parameters = params
520  data = self._uasocket.send_request(request)
521  response = ua.HistoryReadResponse.from_binary(data)
522  self.logger.debug(response)
523  response.ResponseHeader.ServiceResult.check()
524  return response.Results
525 
526  def modify_monitored_items(self, params):
527  self.logger.info("modify_monitored_items")
529  request.Parameters = params
530  data = self._uasocket.send_request(request)
531  response = ua.ModifyMonitoredItemsResponse.from_binary(data)
532  self.logger.debug(response)
533  response.ResponseHeader.ServiceResult.check()
534  return response.Results
def _create_subscription_callback(self, pub_callback, resp_fut, data_fut)
Definition: ua_client.py:378
def check_answer(self, data, context)
Definition: ua_client.py:80
def add_nodes(self, nodestoadd)
Definition: ua_client.py:487
def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage)
Definition: ua_client.py:68
def create_monitored_items(self, params)
Definition: ua_client.py:467
def set_security(self, policy)
Definition: ua_client.py:207
def find_servers_on_network(self, params)
Definition: ua_client.py:329
def publish(self, acks=None)
Definition: ua_client.py:406
def register_server2(self, params)
Definition: ua_client.py:349
def translate_browsepaths_to_nodeids(self, browsepaths)
Definition: ua_client.py:359
def delete_subscriptions(self, subscriptionids)
Definition: ua_client.py:387
def __init__(self, timeout=1, security_policy=ua.SecurityPolicy())
Definition: ua_client.py:21
def create_subscription(self, params, callback)
Definition: ua_client.py:369
def call(self, methodstocall)
Definition: ua_client.py:507
def get_endpoints(self, params)
Definition: ua_client.py:309
def __init__(self, timeout=1)
Definition: ua_client.py:199
def connect_socket(self, host, port)
Definition: ua_client.py:130
def history_read(self, params)
Definition: ua_client.py:516
def activate_session(self, parameters)
Definition: ua_client.py:244
def _create_request_header(self, timeout=1000)
Definition: ua_client.py:122
def _call_callback(self, request_id, body)
Definition: ua_client.py:115
def delete_nodes(self, params)
Definition: ua_client.py:497
def create_session(self, parameters)
Definition: ua_client.py:233
def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage)
Definition: ua_client.py:43
def open_secure_channel(self, params)
Definition: ua_client.py:157
def find_servers(self, params)
Definition: ua_client.py:319
def modify_monitored_items(self, params)
Definition: ua_client.py:526
def open_secure_channel(self, params)
Definition: ua_client.py:223
def _call_publish_callback(self, future)
Definition: ua_client.py:415
def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut)
Definition: ua_client.py:396
def connect_socket(self, host, port)
Definition: ua_client.py:210
def browse(self, parameters)
Definition: ua_client.py:269
def read(self, parameters)
Definition: ua_client.py:279
def delete_monitored_items(self, params)
Definition: ua_client.py:477
def close_session(self, deletesubscriptions)
Definition: ua_client.py:254
def register_server(self, registered_server)
Definition: ua_client.py:339


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:44