ua_client.py
Go to the documentation of this file.
00001 """
00002 Low level binary client
00003 """
00004 
00005 import logging
00006 import socket
00007 from threading import Thread, Lock
00008 from concurrent.futures import Future
00009 from functools import partial
00010 
00011 from opcua import ua
00012 from opcua.common import utils
00013 from opcua.ua.uaerrors import UaError, BadTimeout, BadNoSubscription, BadSessionClosed
00014 
00015 
00016 class UASocketClient(object):
00017     """
00018     handle socket connection and send ua messages
00019     timeout is the timeout used while waiting for an ua answer from server
00020     """
00021     def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
00022         self.logger = logging.getLogger(__name__ + ".Socket")
00023         self._thread = None
00024         self._lock = Lock()
00025         self.timeout = timeout
00026         self._socket = None
00027         self._do_stop = False
00028         self.authentication_token = ua.NodeId()
00029         self._request_id = 0
00030         self._request_handle = 0
00031         self._callbackmap = {}
00032         self._connection = ua.SecureConnection(security_policy)
00033 
00034     def start(self):
00035         """
00036         Start receiving thread.
00037         this is called automatically in connect and
00038         should not be necessary to call directly
00039         """
00040         self._thread = Thread(target=self._run)
00041         self._thread.start()
00042 
00043     def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
00044         """
00045         send request to server, lower-level method
00046         timeout is the timeout written in ua header
00047         returns future
00048         """
00049         with self._lock:
00050             request.RequestHeader = self._create_request_header(timeout)
00051             self.logger.debug("Sending: %s", request)
00052             try:
00053                 binreq = request.to_binary()
00054             except:
00055                 # reset reqeust handle if any error
00056                 # see self._create_request_header
00057                 self._request_handle -= 1
00058                 raise
00059             self._request_id += 1
00060             future = Future()
00061             if callback:
00062                 future.add_done_callback(callback)
00063             self._callbackmap[self._request_id] = future
00064             msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
00065             self._socket.write(msg)
00066         return future
00067 
00068     def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
00069         """
00070         send request to server.
00071         timeout is the timeout written in ua header
00072         returns response object if no callback is provided
00073         """
00074         future = self._send_request(request, callback, timeout, message_type)
00075         if not callback:
00076             data = future.result(self.timeout)
00077             self.check_answer(data, " in response to " + request.__class__.__name__)
00078             return data
00079 
00080     def check_answer(self, data, context):
00081         data = data.copy()
00082         typeid = ua.NodeId.from_binary(data)
00083         if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
00084             self.logger.warning("ServiceFault from server received %s", context)
00085             hdr = ua.ResponseHeader.from_binary(data)
00086             hdr.ServiceResult.check()
00087             return False
00088         return True
00089 
00090     def _run(self):
00091         self.logger.info("Thread started")
00092         while not self._do_stop:
00093             try:
00094                 self._receive()
00095             except ua.utils.SocketClosedException:
00096                 self.logger.info("Socket has closed connection")
00097                 break
00098             except UaError:
00099                 self.logger.exception("Protocol Error")
00100         self.logger.info("Thread ended")
00101 
00102     def _receive(self):
00103         msg = self._connection.receive_from_socket(self._socket)
00104         if msg is None:
00105             return
00106         elif isinstance(msg, ua.Message):
00107             self._call_callback(msg.request_id(), msg.body())
00108         elif isinstance(msg, ua.Acknowledge):
00109             self._call_callback(0, msg)
00110         elif isinstance(msg, ua.ErrorMessage):
00111             self.logger.warning("Received an error: %s", msg)
00112         else:
00113             raise ua.UaError("Unsupported message type: %s", msg)
00114 
00115     def _call_callback(self, request_id, body):
00116         with self._lock:
00117             future = self._callbackmap.pop(request_id, None)
00118             if future is None:
00119                 raise ua.UaError("No future object found for request: {0}, callbacks in list are {1}".format(request_id, self._callbackmap.keys()))
00120         future.set_result(body)
00121 
00122     def _create_request_header(self, timeout=1000):
00123         hdr = ua.RequestHeader()
00124         hdr.AuthenticationToken = self.authentication_token
00125         self._request_handle += 1
00126         hdr.RequestHandle = self._request_handle
00127         hdr.TimeoutHint = timeout
00128         return hdr
00129 
00130     def connect_socket(self, host, port):
00131         """
00132         connect to server socket and start receiving thread
00133         """
00134         self.logger.info("opening connection")
00135         sock = socket.create_connection((host, port))
00136         sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
00137         self._socket = utils.SocketWrapper(sock)
00138         self.start()
00139 
00140     def disconnect_socket(self):
00141         self.logger.info("stop request")
00142         self._do_stop = True
00143         self._socket.socket.shutdown(socket.SHUT_RDWR)
00144         self._socket.socket.close()
00145 
00146     def send_hello(self, url):
00147         hello = ua.Hello()
00148         hello.EndpointUrl = url
00149         future = Future()
00150         with self._lock:
00151             self._callbackmap[0] = future
00152         binmsg = self._connection.tcp_to_binary(ua.MessageType.Hello, hello)
00153         self._socket.write(binmsg)
00154         ack = future.result(self.timeout)
00155         return ack
00156 
00157     def open_secure_channel(self, params):
00158         self.logger.info("open_secure_channel")
00159         request = ua.OpenSecureChannelRequest()
00160         request.Parameters = params
00161         future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
00162         
00163         # FIXME: we have a race condition here
00164         # we can get a packet with the new token id before we reach to store it..
00165         response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
00166         response.ResponseHeader.ServiceResult.check()
00167         self._connection.set_channel(response.Parameters)
00168         return response.Parameters
00169 
00170     def close_secure_channel(self):
00171         """
00172         close secure channel. It seems to trigger a shutdown of socket
00173         in most servers, so be prepare to reconnect.
00174         OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
00175         """
00176         self.logger.info("close_secure_channel")
00177         request = ua.CloseSecureChannelRequest()
00178         future = self._send_request(request, message_type=ua.MessageType.SecureClose)
00179         with self._lock:
00180             # don't expect any more answers
00181             future.cancel()
00182             self._callbackmap.clear()
00183 
00184         # some servers send a response here, most do not ... so we ignore
00185 
00186 
00187 class UaClient(object):
00188 
00189     """
00190     low level OPC-UA client.
00191 
00192     It implements (almost) all methods defined in opcua spec
00193     taking in argument the structures defined in opcua spec.
00194 
00195     In this Python implementation  most of the structures are defined in
00196     uaprotocol_auto.py and uaprotocol_hand.py available under opcua.ua
00197     """
00198 
00199     def __init__(self, timeout=1):
00200         self.logger = logging.getLogger(__name__)
00201         # _publishcallbacks should be accessed in recv thread only
00202         self._publishcallbacks = {}
00203         self._timeout = timeout
00204         self._uasocket = None
00205         self._security_policy = ua.SecurityPolicy()
00206 
00207     def set_security(self, policy):
00208         self._security_policy = policy
00209 
00210     def connect_socket(self, host, port):
00211         """
00212         connect to server socket and start receiving thread
00213         """
00214         self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy)
00215         return self._uasocket.connect_socket(host, port)
00216 
00217     def disconnect_socket(self):
00218         return self._uasocket.disconnect_socket()
00219 
00220     def send_hello(self, url):
00221         return self._uasocket.send_hello(url)
00222 
00223     def open_secure_channel(self, params):
00224         return self._uasocket.open_secure_channel(params)
00225 
00226     def close_secure_channel(self):
00227         """
00228         close secure channel. It seems to trigger a shutdown of socket
00229         in most servers, so be prepare to reconnect
00230         """
00231         return self._uasocket.close_secure_channel()
00232 
00233     def create_session(self, parameters):
00234         self.logger.info("create_session")
00235         request = ua.CreateSessionRequest()
00236         request.Parameters = parameters
00237         data = self._uasocket.send_request(request)
00238         response = ua.CreateSessionResponse.from_binary(data)
00239         self.logger.debug(response)
00240         response.ResponseHeader.ServiceResult.check()
00241         self._uasocket.authentication_token = response.Parameters.AuthenticationToken
00242         return response.Parameters
00243 
00244     def activate_session(self, parameters):
00245         self.logger.info("activate_session")
00246         request = ua.ActivateSessionRequest()
00247         request.Parameters = parameters
00248         data = self._uasocket.send_request(request)
00249         response = ua.ActivateSessionResponse.from_binary(data)
00250         self.logger.debug(response)
00251         response.ResponseHeader.ServiceResult.check()
00252         return response.Parameters
00253 
00254     def close_session(self, deletesubscriptions):
00255         self.logger.info("close_session")
00256         request = ua.CloseSessionRequest()
00257         request.DeleteSubscriptions = deletesubscriptions
00258         data = self._uasocket.send_request(request)
00259         response = ua.CloseSessionResponse.from_binary(data)
00260         try:
00261             response.ResponseHeader.ServiceResult.check()
00262         except BadSessionClosed:
00263             # Problem: closing the session with open publish requests leads to BadSessionClosed responses
00264             #          we can just ignore it therefore.
00265             #          Alternatively we could make sure that there are no publish requests in flight when
00266             #          closing the session.
00267             pass
00268 
00269     def browse(self, parameters):
00270         self.logger.info("browse")
00271         request = ua.BrowseRequest()
00272         request.Parameters = parameters
00273         data = self._uasocket.send_request(request)
00274         response = ua.BrowseResponse.from_binary(data)
00275         self.logger.debug(response)
00276         response.ResponseHeader.ServiceResult.check()
00277         return response.Results
00278 
00279     def read(self, parameters):
00280         self.logger.info("read")
00281         request = ua.ReadRequest()
00282         request.Parameters = parameters
00283         data = self._uasocket.send_request(request)
00284         response = ua.ReadResponse.from_binary(data)
00285         self.logger.debug(response)
00286         response.ResponseHeader.ServiceResult.check()
00287         # cast to Enum attributes that need to
00288         for idx, rv in enumerate(parameters.NodesToRead):
00289             if rv.AttributeId == ua.AttributeIds.NodeClass:
00290                 dv = response.Results[idx]
00291                 if dv.StatusCode.is_good():
00292                     dv.Value.Value = ua.NodeClass(dv.Value.Value)
00293             elif rv.AttributeId == ua.AttributeIds.ValueRank:
00294                 dv = response.Results[idx]
00295                 if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
00296                     dv.Value.Value = ua.ValueRank(dv.Value.Value)
00297         return response.Results
00298 
00299     def write(self, params):
00300         self.logger.info("read")
00301         request = ua.WriteRequest()
00302         request.Parameters = params
00303         data = self._uasocket.send_request(request)
00304         response = ua.WriteResponse.from_binary(data)
00305         self.logger.debug(response)
00306         response.ResponseHeader.ServiceResult.check()
00307         return response.Results
00308 
00309     def get_endpoints(self, params):
00310         self.logger.info("get_endpoint")
00311         request = ua.GetEndpointsRequest()
00312         request.Parameters = params
00313         data = self._uasocket.send_request(request)
00314         response = ua.GetEndpointsResponse.from_binary(data)
00315         self.logger.debug(response)
00316         response.ResponseHeader.ServiceResult.check()
00317         return response.Endpoints
00318 
00319     def find_servers(self, params):
00320         self.logger.info("find_servers")
00321         request = ua.FindServersRequest()
00322         request.Parameters = params
00323         data = self._uasocket.send_request(request)
00324         response = ua.FindServersResponse.from_binary(data)
00325         self.logger.debug(response)
00326         response.ResponseHeader.ServiceResult.check()
00327         return response.Servers
00328 
00329     def find_servers_on_network(self, params):
00330         self.logger.info("find_servers_on_network")
00331         request = ua.FindServersOnNetworkRequest()
00332         request.Parameters = params
00333         data = self._uasocket.send_request(request)
00334         response = ua.FindServersOnNetworkResponse.from_binary(data)
00335         self.logger.debug(response)
00336         response.ResponseHeader.ServiceResult.check()
00337         return response.Parameters
00338 
00339     def register_server(self, registered_server):
00340         self.logger.info("register_server")
00341         request = ua.RegisterServerRequest()
00342         request.Server = registered_server
00343         data = self._uasocket.send_request(request)
00344         response = ua.RegisterServerResponse.from_binary(data)
00345         self.logger.debug(response)
00346         response.ResponseHeader.ServiceResult.check()
00347         # nothing to return for this service
00348 
00349     def register_server2(self, params):
00350         self.logger.info("register_server2")
00351         request = ua.RegisterServer2Request()
00352         request.Parameters = params
00353         data = self._uasocket.send_request(request)
00354         response = ua.RegisterServer2Response.from_binary(data)
00355         self.logger.debug(response)
00356         response.ResponseHeader.ServiceResult.check()
00357         return response.ConfigurationResults
00358 
00359     def translate_browsepaths_to_nodeids(self, browsepaths):
00360         self.logger.info("translate_browsepath_to_nodeid")
00361         request = ua.TranslateBrowsePathsToNodeIdsRequest()
00362         request.Parameters.BrowsePaths = browsepaths
00363         data = self._uasocket.send_request(request)
00364         response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
00365         self.logger.debug(response)
00366         response.ResponseHeader.ServiceResult.check()
00367         return response.Results
00368 
00369     def create_subscription(self, params, callback):
00370         self.logger.info("create_subscription")
00371         request = ua.CreateSubscriptionRequest()
00372         request.Parameters = params
00373         resp_fut = Future()
00374         mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
00375         self._uasocket.send_request(request, mycallbak)
00376         return resp_fut.result(self._timeout)
00377 
00378     def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
00379         self.logger.info("_create_subscription_callback")
00380         data = data_fut.result()
00381         response = ua.CreateSubscriptionResponse.from_binary(data)
00382         self.logger.debug(response)
00383         response.ResponseHeader.ServiceResult.check()
00384         self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
00385         resp_fut.set_result(response.Parameters)
00386 
00387     def delete_subscriptions(self, subscriptionids):
00388         self.logger.info("delete_subscription")
00389         request = ua.DeleteSubscriptionsRequest()
00390         request.Parameters.SubscriptionIds = subscriptionids
00391         resp_fut = Future()
00392         mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
00393         self._uasocket.send_request(request, mycallbak)
00394         return resp_fut.result(self._timeout)
00395 
00396     def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
00397         self.logger.info("_delete_subscriptions_callback")
00398         data = data_fut.result()
00399         response = ua.DeleteSubscriptionsResponse.from_binary(data)
00400         self.logger.debug(response)
00401         response.ResponseHeader.ServiceResult.check()
00402         for sid in subscriptionids:
00403             self._publishcallbacks.pop(sid)
00404         resp_fut.set_result(response.Results)
00405 
00406     def publish(self, acks=None):
00407         self.logger.info("publish")
00408         if acks is None:
00409             acks = []
00410         request = ua.PublishRequest()
00411         request.Parameters.SubscriptionAcknowledgements = acks
00412         # timeout could be set to 0 (= no timeout) but some servers do not support it
00413         self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8)) # 250 days
00414 
00415     def _call_publish_callback(self, future):
00416         self.logger.info("call_publish_callback")
00417         data = future.result()
00418 
00419         # check if answer looks ok
00420         try:
00421             self._uasocket.check_answer(data, "while waiting for publish response")
00422         except BadTimeout: # Spec Part 4, 7.28
00423             self.publish()
00424             return
00425         except BadNoSubscription: # Spec Part 5, 13.8.1
00426             # BadNoSubscription is expected after deleting the last subscription.
00427             #
00428             # We should therefore also check for len(self._publishcallbacks) == 0, but
00429             # this gets us into trouble if a Publish response arrives before the
00430             # DeleteSubscription response.
00431             #
00432             # We could remove the callback already when sending the DeleteSubscription request,
00433             # but there are some legitimate reasons to keep them around, such as when the server
00434             # responds with "BadTimeout" and we should try again later instead of just removing
00435             # the subscription client-side.
00436             #
00437             # There are a variety of ways to act correctly, but the most practical solution seems
00438             # to be to just ignore any BadNoSubscription responses.
00439             self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
00440             return
00441 
00442         # parse publish response
00443         try:
00444             response = ua.PublishResponse.from_binary(data)
00445             self.logger.debug(response)
00446         except Exception:
00447             # INFO: catching the exception here might be obsolete because we already
00448             #       catch BadTimeout above. However, it's not really clear what this code
00449             #       does so it stays in, doesn't seem to hurt.
00450             self.logger.exception("Error parsing notificatipn from server")
00451             self.publish([]) #send publish request ot server so he does stop sending notifications
00452             return
00453 
00454         # look for callback
00455         try:
00456             callback = self._publishcallbacks[response.Parameters.SubscriptionId]
00457         except KeyError:
00458             self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
00459             return
00460 
00461         # do callback
00462         try:
00463             callback(response.Parameters)
00464         except Exception:  # we call client code, catch everything!
00465             self.logger.exception("Exception while calling user callback: %s")
00466 
00467     def create_monitored_items(self, params):
00468         self.logger.info("create_monitored_items")
00469         request = ua.CreateMonitoredItemsRequest()
00470         request.Parameters = params
00471         data = self._uasocket.send_request(request)
00472         response = ua.CreateMonitoredItemsResponse.from_binary(data)
00473         self.logger.debug(response)
00474         response.ResponseHeader.ServiceResult.check()
00475         return response.Results
00476 
00477     def delete_monitored_items(self, params):
00478         self.logger.info("delete_monitored_items")
00479         request = ua.DeleteMonitoredItemsRequest()
00480         request.Parameters = params
00481         data = self._uasocket.send_request(request)
00482         response = ua.DeleteMonitoredItemsResponse.from_binary(data)
00483         self.logger.debug(response)
00484         response.ResponseHeader.ServiceResult.check()
00485         return response.Results
00486 
00487     def add_nodes(self, nodestoadd):
00488         self.logger.info("add_nodes")
00489         request = ua.AddNodesRequest()
00490         request.Parameters.NodesToAdd = nodestoadd
00491         data = self._uasocket.send_request(request)
00492         response = ua.AddNodesResponse.from_binary(data)
00493         self.logger.debug(response)
00494         response.ResponseHeader.ServiceResult.check()
00495         return response.Results
00496 
00497     def delete_nodes(self, params):
00498         self.logger.info("delete_nodes")
00499         request = ua.DeleteNodesRequest()
00500         request.Parameters = params
00501         data = self._uasocket.send_request(request)
00502         response = ua.DeleteNodesResponse.from_binary(data)
00503         self.logger.debug(response)
00504         response.ResponseHeader.ServiceResult.check()
00505         return response.Results
00506 
00507     def call(self, methodstocall):
00508         request = ua.CallRequest()
00509         request.Parameters.MethodsToCall = methodstocall
00510         data = self._uasocket.send_request(request)
00511         response = ua.CallResponse.from_binary(data)
00512         self.logger.debug(response)
00513         response.ResponseHeader.ServiceResult.check()
00514         return response.Results
00515 
00516     def history_read(self, params):
00517         self.logger.info("history_read")
00518         request = ua.HistoryReadRequest()
00519         request.Parameters = params
00520         data = self._uasocket.send_request(request)
00521         response = ua.HistoryReadResponse.from_binary(data)
00522         self.logger.debug(response)
00523         response.ResponseHeader.ServiceResult.check()
00524         return response.Results
00525 
00526     def modify_monitored_items(self, params):
00527         self.logger.info("modify_monitored_items")
00528         request = ua.ModifyMonitoredItemsRequest()
00529         request.Parameters = params
00530         data = self._uasocket.send_request(request)
00531         response = ua.ModifyMonitoredItemsResponse.from_binary(data)
00532         self.logger.debug(response)
00533         response.ResponseHeader.ServiceResult.check()
00534         return response.Results


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:23