00001 """
00002 Low level binary client
00003 """
00004
00005 import logging
00006 import socket
00007 from threading import Thread, Lock
00008 from concurrent.futures import Future
00009 from functools import partial
00010
00011 from opcua import ua
00012 from opcua.common import utils
00013 from opcua.ua.uaerrors import UaError, BadTimeout, BadNoSubscription, BadSessionClosed
00014
00015
00016 class UASocketClient(object):
00017 """
00018 handle socket connection and send ua messages
00019 timeout is the timeout used while waiting for an ua answer from server
00020 """
00021 def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
00022 self.logger = logging.getLogger(__name__ + ".Socket")
00023 self._thread = None
00024 self._lock = Lock()
00025 self.timeout = timeout
00026 self._socket = None
00027 self._do_stop = False
00028 self.authentication_token = ua.NodeId()
00029 self._request_id = 0
00030 self._request_handle = 0
00031 self._callbackmap = {}
00032 self._connection = ua.SecureConnection(security_policy)
00033
00034 def start(self):
00035 """
00036 Start receiving thread.
00037 this is called automatically in connect and
00038 should not be necessary to call directly
00039 """
00040 self._thread = Thread(target=self._run)
00041 self._thread.start()
00042
00043 def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
00044 """
00045 send request to server, lower-level method
00046 timeout is the timeout written in ua header
00047 returns future
00048 """
00049 with self._lock:
00050 request.RequestHeader = self._create_request_header(timeout)
00051 self.logger.debug("Sending: %s", request)
00052 try:
00053 binreq = request.to_binary()
00054 except:
00055
00056
00057 self._request_handle -= 1
00058 raise
00059 self._request_id += 1
00060 future = Future()
00061 if callback:
00062 future.add_done_callback(callback)
00063 self._callbackmap[self._request_id] = future
00064 msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
00065 self._socket.write(msg)
00066 return future
00067
00068 def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
00069 """
00070 send request to server.
00071 timeout is the timeout written in ua header
00072 returns response object if no callback is provided
00073 """
00074 future = self._send_request(request, callback, timeout, message_type)
00075 if not callback:
00076 data = future.result(self.timeout)
00077 self.check_answer(data, " in response to " + request.__class__.__name__)
00078 return data
00079
00080 def check_answer(self, data, context):
00081 data = data.copy()
00082 typeid = ua.NodeId.from_binary(data)
00083 if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
00084 self.logger.warning("ServiceFault from server received %s", context)
00085 hdr = ua.ResponseHeader.from_binary(data)
00086 hdr.ServiceResult.check()
00087 return False
00088 return True
00089
00090 def _run(self):
00091 self.logger.info("Thread started")
00092 while not self._do_stop:
00093 try:
00094 self._receive()
00095 except ua.utils.SocketClosedException:
00096 self.logger.info("Socket has closed connection")
00097 break
00098 except UaError:
00099 self.logger.exception("Protocol Error")
00100 self.logger.info("Thread ended")
00101
00102 def _receive(self):
00103 msg = self._connection.receive_from_socket(self._socket)
00104 if msg is None:
00105 return
00106 elif isinstance(msg, ua.Message):
00107 self._call_callback(msg.request_id(), msg.body())
00108 elif isinstance(msg, ua.Acknowledge):
00109 self._call_callback(0, msg)
00110 elif isinstance(msg, ua.ErrorMessage):
00111 self.logger.warning("Received an error: %s", msg)
00112 else:
00113 raise ua.UaError("Unsupported message type: %s", msg)
00114
00115 def _call_callback(self, request_id, body):
00116 with self._lock:
00117 future = self._callbackmap.pop(request_id, None)
00118 if future is None:
00119 raise ua.UaError("No future object found for request: {0}, callbacks in list are {1}".format(request_id, self._callbackmap.keys()))
00120 future.set_result(body)
00121
00122 def _create_request_header(self, timeout=1000):
00123 hdr = ua.RequestHeader()
00124 hdr.AuthenticationToken = self.authentication_token
00125 self._request_handle += 1
00126 hdr.RequestHandle = self._request_handle
00127 hdr.TimeoutHint = timeout
00128 return hdr
00129
00130 def connect_socket(self, host, port):
00131 """
00132 connect to server socket and start receiving thread
00133 """
00134 self.logger.info("opening connection")
00135 sock = socket.create_connection((host, port))
00136 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
00137 self._socket = utils.SocketWrapper(sock)
00138 self.start()
00139
00140 def disconnect_socket(self):
00141 self.logger.info("stop request")
00142 self._do_stop = True
00143 self._socket.socket.shutdown(socket.SHUT_RDWR)
00144 self._socket.socket.close()
00145
00146 def send_hello(self, url):
00147 hello = ua.Hello()
00148 hello.EndpointUrl = url
00149 future = Future()
00150 with self._lock:
00151 self._callbackmap[0] = future
00152 binmsg = self._connection.tcp_to_binary(ua.MessageType.Hello, hello)
00153 self._socket.write(binmsg)
00154 ack = future.result(self.timeout)
00155 return ack
00156
00157 def open_secure_channel(self, params):
00158 self.logger.info("open_secure_channel")
00159 request = ua.OpenSecureChannelRequest()
00160 request.Parameters = params
00161 future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
00162
00163
00164
00165 response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
00166 response.ResponseHeader.ServiceResult.check()
00167 self._connection.set_channel(response.Parameters)
00168 return response.Parameters
00169
00170 def close_secure_channel(self):
00171 """
00172 close secure channel. It seems to trigger a shutdown of socket
00173 in most servers, so be prepare to reconnect.
00174 OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
00175 """
00176 self.logger.info("close_secure_channel")
00177 request = ua.CloseSecureChannelRequest()
00178 future = self._send_request(request, message_type=ua.MessageType.SecureClose)
00179 with self._lock:
00180
00181 future.cancel()
00182 self._callbackmap.clear()
00183
00184
00185
00186
00187 class UaClient(object):
00188
00189 """
00190 low level OPC-UA client.
00191
00192 It implements (almost) all methods defined in opcua spec
00193 taking in argument the structures defined in opcua spec.
00194
00195 In this Python implementation most of the structures are defined in
00196 uaprotocol_auto.py and uaprotocol_hand.py available under opcua.ua
00197 """
00198
00199 def __init__(self, timeout=1):
00200 self.logger = logging.getLogger(__name__)
00201
00202 self._publishcallbacks = {}
00203 self._timeout = timeout
00204 self._uasocket = None
00205 self._security_policy = ua.SecurityPolicy()
00206
00207 def set_security(self, policy):
00208 self._security_policy = policy
00209
00210 def connect_socket(self, host, port):
00211 """
00212 connect to server socket and start receiving thread
00213 """
00214 self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy)
00215 return self._uasocket.connect_socket(host, port)
00216
00217 def disconnect_socket(self):
00218 return self._uasocket.disconnect_socket()
00219
00220 def send_hello(self, url):
00221 return self._uasocket.send_hello(url)
00222
00223 def open_secure_channel(self, params):
00224 return self._uasocket.open_secure_channel(params)
00225
00226 def close_secure_channel(self):
00227 """
00228 close secure channel. It seems to trigger a shutdown of socket
00229 in most servers, so be prepare to reconnect
00230 """
00231 return self._uasocket.close_secure_channel()
00232
00233 def create_session(self, parameters):
00234 self.logger.info("create_session")
00235 request = ua.CreateSessionRequest()
00236 request.Parameters = parameters
00237 data = self._uasocket.send_request(request)
00238 response = ua.CreateSessionResponse.from_binary(data)
00239 self.logger.debug(response)
00240 response.ResponseHeader.ServiceResult.check()
00241 self._uasocket.authentication_token = response.Parameters.AuthenticationToken
00242 return response.Parameters
00243
00244 def activate_session(self, parameters):
00245 self.logger.info("activate_session")
00246 request = ua.ActivateSessionRequest()
00247 request.Parameters = parameters
00248 data = self._uasocket.send_request(request)
00249 response = ua.ActivateSessionResponse.from_binary(data)
00250 self.logger.debug(response)
00251 response.ResponseHeader.ServiceResult.check()
00252 return response.Parameters
00253
00254 def close_session(self, deletesubscriptions):
00255 self.logger.info("close_session")
00256 request = ua.CloseSessionRequest()
00257 request.DeleteSubscriptions = deletesubscriptions
00258 data = self._uasocket.send_request(request)
00259 response = ua.CloseSessionResponse.from_binary(data)
00260 try:
00261 response.ResponseHeader.ServiceResult.check()
00262 except BadSessionClosed:
00263
00264
00265
00266
00267 pass
00268
00269 def browse(self, parameters):
00270 self.logger.info("browse")
00271 request = ua.BrowseRequest()
00272 request.Parameters = parameters
00273 data = self._uasocket.send_request(request)
00274 response = ua.BrowseResponse.from_binary(data)
00275 self.logger.debug(response)
00276 response.ResponseHeader.ServiceResult.check()
00277 return response.Results
00278
00279 def read(self, parameters):
00280 self.logger.info("read")
00281 request = ua.ReadRequest()
00282 request.Parameters = parameters
00283 data = self._uasocket.send_request(request)
00284 response = ua.ReadResponse.from_binary(data)
00285 self.logger.debug(response)
00286 response.ResponseHeader.ServiceResult.check()
00287
00288 for idx, rv in enumerate(parameters.NodesToRead):
00289 if rv.AttributeId == ua.AttributeIds.NodeClass:
00290 dv = response.Results[idx]
00291 if dv.StatusCode.is_good():
00292 dv.Value.Value = ua.NodeClass(dv.Value.Value)
00293 elif rv.AttributeId == ua.AttributeIds.ValueRank:
00294 dv = response.Results[idx]
00295 if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
00296 dv.Value.Value = ua.ValueRank(dv.Value.Value)
00297 return response.Results
00298
00299 def write(self, params):
00300 self.logger.info("read")
00301 request = ua.WriteRequest()
00302 request.Parameters = params
00303 data = self._uasocket.send_request(request)
00304 response = ua.WriteResponse.from_binary(data)
00305 self.logger.debug(response)
00306 response.ResponseHeader.ServiceResult.check()
00307 return response.Results
00308
00309 def get_endpoints(self, params):
00310 self.logger.info("get_endpoint")
00311 request = ua.GetEndpointsRequest()
00312 request.Parameters = params
00313 data = self._uasocket.send_request(request)
00314 response = ua.GetEndpointsResponse.from_binary(data)
00315 self.logger.debug(response)
00316 response.ResponseHeader.ServiceResult.check()
00317 return response.Endpoints
00318
00319 def find_servers(self, params):
00320 self.logger.info("find_servers")
00321 request = ua.FindServersRequest()
00322 request.Parameters = params
00323 data = self._uasocket.send_request(request)
00324 response = ua.FindServersResponse.from_binary(data)
00325 self.logger.debug(response)
00326 response.ResponseHeader.ServiceResult.check()
00327 return response.Servers
00328
00329 def find_servers_on_network(self, params):
00330 self.logger.info("find_servers_on_network")
00331 request = ua.FindServersOnNetworkRequest()
00332 request.Parameters = params
00333 data = self._uasocket.send_request(request)
00334 response = ua.FindServersOnNetworkResponse.from_binary(data)
00335 self.logger.debug(response)
00336 response.ResponseHeader.ServiceResult.check()
00337 return response.Parameters
00338
00339 def register_server(self, registered_server):
00340 self.logger.info("register_server")
00341 request = ua.RegisterServerRequest()
00342 request.Server = registered_server
00343 data = self._uasocket.send_request(request)
00344 response = ua.RegisterServerResponse.from_binary(data)
00345 self.logger.debug(response)
00346 response.ResponseHeader.ServiceResult.check()
00347
00348
00349 def register_server2(self, params):
00350 self.logger.info("register_server2")
00351 request = ua.RegisterServer2Request()
00352 request.Parameters = params
00353 data = self._uasocket.send_request(request)
00354 response = ua.RegisterServer2Response.from_binary(data)
00355 self.logger.debug(response)
00356 response.ResponseHeader.ServiceResult.check()
00357 return response.ConfigurationResults
00358
00359 def translate_browsepaths_to_nodeids(self, browsepaths):
00360 self.logger.info("translate_browsepath_to_nodeid")
00361 request = ua.TranslateBrowsePathsToNodeIdsRequest()
00362 request.Parameters.BrowsePaths = browsepaths
00363 data = self._uasocket.send_request(request)
00364 response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
00365 self.logger.debug(response)
00366 response.ResponseHeader.ServiceResult.check()
00367 return response.Results
00368
00369 def create_subscription(self, params, callback):
00370 self.logger.info("create_subscription")
00371 request = ua.CreateSubscriptionRequest()
00372 request.Parameters = params
00373 resp_fut = Future()
00374 mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
00375 self._uasocket.send_request(request, mycallbak)
00376 return resp_fut.result(self._timeout)
00377
00378 def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
00379 self.logger.info("_create_subscription_callback")
00380 data = data_fut.result()
00381 response = ua.CreateSubscriptionResponse.from_binary(data)
00382 self.logger.debug(response)
00383 response.ResponseHeader.ServiceResult.check()
00384 self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
00385 resp_fut.set_result(response.Parameters)
00386
00387 def delete_subscriptions(self, subscriptionids):
00388 self.logger.info("delete_subscription")
00389 request = ua.DeleteSubscriptionsRequest()
00390 request.Parameters.SubscriptionIds = subscriptionids
00391 resp_fut = Future()
00392 mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
00393 self._uasocket.send_request(request, mycallbak)
00394 return resp_fut.result(self._timeout)
00395
00396 def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
00397 self.logger.info("_delete_subscriptions_callback")
00398 data = data_fut.result()
00399 response = ua.DeleteSubscriptionsResponse.from_binary(data)
00400 self.logger.debug(response)
00401 response.ResponseHeader.ServiceResult.check()
00402 for sid in subscriptionids:
00403 self._publishcallbacks.pop(sid)
00404 resp_fut.set_result(response.Results)
00405
00406 def publish(self, acks=None):
00407 self.logger.info("publish")
00408 if acks is None:
00409 acks = []
00410 request = ua.PublishRequest()
00411 request.Parameters.SubscriptionAcknowledgements = acks
00412
00413 self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8))
00414
00415 def _call_publish_callback(self, future):
00416 self.logger.info("call_publish_callback")
00417 data = future.result()
00418
00419
00420 try:
00421 self._uasocket.check_answer(data, "while waiting for publish response")
00422 except BadTimeout:
00423 self.publish()
00424 return
00425 except BadNoSubscription:
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436
00437
00438
00439 self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
00440 return
00441
00442
00443 try:
00444 response = ua.PublishResponse.from_binary(data)
00445 self.logger.debug(response)
00446 except Exception:
00447
00448
00449
00450 self.logger.exception("Error parsing notificatipn from server")
00451 self.publish([])
00452 return
00453
00454
00455 try:
00456 callback = self._publishcallbacks[response.Parameters.SubscriptionId]
00457 except KeyError:
00458 self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
00459 return
00460
00461
00462 try:
00463 callback(response.Parameters)
00464 except Exception:
00465 self.logger.exception("Exception while calling user callback: %s")
00466
00467 def create_monitored_items(self, params):
00468 self.logger.info("create_monitored_items")
00469 request = ua.CreateMonitoredItemsRequest()
00470 request.Parameters = params
00471 data = self._uasocket.send_request(request)
00472 response = ua.CreateMonitoredItemsResponse.from_binary(data)
00473 self.logger.debug(response)
00474 response.ResponseHeader.ServiceResult.check()
00475 return response.Results
00476
00477 def delete_monitored_items(self, params):
00478 self.logger.info("delete_monitored_items")
00479 request = ua.DeleteMonitoredItemsRequest()
00480 request.Parameters = params
00481 data = self._uasocket.send_request(request)
00482 response = ua.DeleteMonitoredItemsResponse.from_binary(data)
00483 self.logger.debug(response)
00484 response.ResponseHeader.ServiceResult.check()
00485 return response.Results
00486
00487 def add_nodes(self, nodestoadd):
00488 self.logger.info("add_nodes")
00489 request = ua.AddNodesRequest()
00490 request.Parameters.NodesToAdd = nodestoadd
00491 data = self._uasocket.send_request(request)
00492 response = ua.AddNodesResponse.from_binary(data)
00493 self.logger.debug(response)
00494 response.ResponseHeader.ServiceResult.check()
00495 return response.Results
00496
00497 def delete_nodes(self, params):
00498 self.logger.info("delete_nodes")
00499 request = ua.DeleteNodesRequest()
00500 request.Parameters = params
00501 data = self._uasocket.send_request(request)
00502 response = ua.DeleteNodesResponse.from_binary(data)
00503 self.logger.debug(response)
00504 response.ResponseHeader.ServiceResult.check()
00505 return response.Results
00506
00507 def call(self, methodstocall):
00508 request = ua.CallRequest()
00509 request.Parameters.MethodsToCall = methodstocall
00510 data = self._uasocket.send_request(request)
00511 response = ua.CallResponse.from_binary(data)
00512 self.logger.debug(response)
00513 response.ResponseHeader.ServiceResult.check()
00514 return response.Results
00515
00516 def history_read(self, params):
00517 self.logger.info("history_read")
00518 request = ua.HistoryReadRequest()
00519 request.Parameters = params
00520 data = self._uasocket.send_request(request)
00521 response = ua.HistoryReadResponse.from_binary(data)
00522 self.logger.debug(response)
00523 response.ResponseHeader.ServiceResult.check()
00524 return response.Results
00525
00526 def modify_monitored_items(self, params):
00527 self.logger.info("modify_monitored_items")
00528 request = ua.ModifyMonitoredItemsRequest()
00529 request.Parameters = params
00530 data = self._uasocket.send_request(request)
00531 response = ua.ModifyMonitoredItemsResponse.from_binary(data)
00532 self.logger.debug(response)
00533 response.ResponseHeader.ServiceResult.check()
00534 return response.Results