2 Low level binary client 7 from threading
import Thread, Lock
8 from concurrent.futures
import Future
9 from functools
import partial
13 from opcua.ua.uaerrors import UaError, BadTimeout, BadNoSubscription, BadSessionClosed
18 handle socket connection and send ua messages 19 timeout is the timeout used while waiting for an ua answer from server 21 def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
22 self.
logger = logging.getLogger(__name__ +
".Socket")
36 Start receiving thread. 37 this is called automatically in connect and 38 should not be necessary to call directly 43 def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
45 send request to server, lower-level method 46 timeout is the timeout written in ua header 51 self.logger.debug(
"Sending: %s", request)
53 binreq = request.to_binary()
62 future.add_done_callback(callback)
64 msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self.
_request_id)
65 self._socket.write(msg)
68 def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
70 send request to server. 71 timeout is the timeout written in ua header 72 returns response object if no callback is provided 74 future = self.
_send_request(request, callback, timeout, message_type)
76 data = future.result(self.
timeout)
77 self.
check_answer(data,
" in response to " + request.__class__.__name__)
82 typeid = ua.NodeId.from_binary(data)
84 self.logger.warning(
"ServiceFault from server received %s", context)
85 hdr = ua.ResponseHeader.from_binary(data)
86 hdr.ServiceResult.check()
91 self.logger.info(
"Thread started")
95 except ua.utils.SocketClosedException:
96 self.logger.info(
"Socket has closed connection")
99 self.logger.exception(
"Protocol Error")
100 self.logger.info(
"Thread ended")
103 msg = self._connection.receive_from_socket(self.
_socket)
111 self.logger.warning(
"Received an error: %s", msg)
113 raise ua.UaError(
"Unsupported message type: %s", msg)
117 future = self._callbackmap.pop(request_id,
None)
119 raise ua.UaError(
"No future object found for request: {0}, callbacks in list are {1}".format(request_id, self._callbackmap.keys()))
120 future.set_result(body)
127 hdr.TimeoutHint = timeout
132 connect to server socket and start receiving thread 134 self.logger.info(
"opening connection")
135 sock = socket.create_connection((host, port))
136 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
137 self.
_socket = utils.SocketWrapper(sock)
141 self.logger.info(
"stop request")
143 self._socket.socket.shutdown(socket.SHUT_RDWR)
144 self._socket.socket.close()
148 hello.EndpointUrl = url
152 binmsg = self._connection.tcp_to_binary(ua.MessageType.Hello, hello)
153 self._socket.write(binmsg)
154 ack = future.result(self.
timeout)
158 self.logger.info(
"open_secure_channel")
160 request.Parameters = params
161 future = self.
_send_request(request, message_type=ua.MessageType.SecureOpen)
165 response = ua.OpenSecureChannelResponse.from_binary(future.result(self.
timeout))
166 response.ResponseHeader.ServiceResult.check()
167 self._connection.set_channel(response.Parameters)
168 return response.Parameters
172 close secure channel. It seems to trigger a shutdown of socket 173 in most servers, so be prepare to reconnect. 174 OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket 176 self.logger.info(
"close_secure_channel")
178 future = self.
_send_request(request, message_type=ua.MessageType.SecureClose)
182 self._callbackmap.clear()
190 low level OPC-UA client. 192 It implements (almost) all methods defined in opcua spec 193 taking in argument the structures defined in opcua spec. 195 In this Python implementation most of the structures are defined in 196 uaprotocol_auto.py and uaprotocol_hand.py available under opcua.ua 200 self.
logger = logging.getLogger(__name__)
212 connect to server socket and start receiving thread 215 return self._uasocket.connect_socket(host, port)
218 return self._uasocket.disconnect_socket()
221 return self._uasocket.send_hello(url)
224 return self._uasocket.open_secure_channel(params)
228 close secure channel. It seems to trigger a shutdown of socket 229 in most servers, so be prepare to reconnect 231 return self._uasocket.close_secure_channel()
234 self.logger.info(
"create_session")
236 request.Parameters = parameters
237 data = self._uasocket.send_request(request)
238 response = ua.CreateSessionResponse.from_binary(data)
239 self.logger.debug(response)
240 response.ResponseHeader.ServiceResult.check()
241 self._uasocket.authentication_token = response.Parameters.AuthenticationToken
242 return response.Parameters
245 self.logger.info(
"activate_session")
247 request.Parameters = parameters
248 data = self._uasocket.send_request(request)
249 response = ua.ActivateSessionResponse.from_binary(data)
250 self.logger.debug(response)
251 response.ResponseHeader.ServiceResult.check()
252 return response.Parameters
255 self.logger.info(
"close_session")
257 request.DeleteSubscriptions = deletesubscriptions
258 data = self._uasocket.send_request(request)
259 response = ua.CloseSessionResponse.from_binary(data)
261 response.ResponseHeader.ServiceResult.check()
262 except BadSessionClosed:
270 self.logger.info(
"browse")
272 request.Parameters = parameters
273 data = self._uasocket.send_request(request)
274 response = ua.BrowseResponse.from_binary(data)
275 self.logger.debug(response)
276 response.ResponseHeader.ServiceResult.check()
277 return response.Results
280 self.logger.info(
"read")
282 request.Parameters = parameters
283 data = self._uasocket.send_request(request)
284 response = ua.ReadResponse.from_binary(data)
285 self.logger.debug(response)
286 response.ResponseHeader.ServiceResult.check()
288 for idx, rv
in enumerate(parameters.NodesToRead):
289 if rv.AttributeId == ua.AttributeIds.NodeClass:
290 dv = response.Results[idx]
291 if dv.StatusCode.is_good():
293 elif rv.AttributeId == ua.AttributeIds.ValueRank:
294 dv = response.Results[idx]
295 if dv.StatusCode.is_good()
and dv.Value.Value
in (-3, -2, -1, 0, 1, 2, 3, 4):
297 return response.Results
300 self.logger.info(
"read")
302 request.Parameters = params
303 data = self._uasocket.send_request(request)
304 response = ua.WriteResponse.from_binary(data)
305 self.logger.debug(response)
306 response.ResponseHeader.ServiceResult.check()
307 return response.Results
310 self.logger.info(
"get_endpoint")
312 request.Parameters = params
313 data = self._uasocket.send_request(request)
314 response = ua.GetEndpointsResponse.from_binary(data)
315 self.logger.debug(response)
316 response.ResponseHeader.ServiceResult.check()
317 return response.Endpoints
320 self.logger.info(
"find_servers")
322 request.Parameters = params
323 data = self._uasocket.send_request(request)
324 response = ua.FindServersResponse.from_binary(data)
325 self.logger.debug(response)
326 response.ResponseHeader.ServiceResult.check()
327 return response.Servers
330 self.logger.info(
"find_servers_on_network")
332 request.Parameters = params
333 data = self._uasocket.send_request(request)
334 response = ua.FindServersOnNetworkResponse.from_binary(data)
335 self.logger.debug(response)
336 response.ResponseHeader.ServiceResult.check()
337 return response.Parameters
340 self.logger.info(
"register_server")
342 request.Server = registered_server
343 data = self._uasocket.send_request(request)
344 response = ua.RegisterServerResponse.from_binary(data)
345 self.logger.debug(response)
346 response.ResponseHeader.ServiceResult.check()
350 self.logger.info(
"register_server2")
352 request.Parameters = params
353 data = self._uasocket.send_request(request)
354 response = ua.RegisterServer2Response.from_binary(data)
355 self.logger.debug(response)
356 response.ResponseHeader.ServiceResult.check()
357 return response.ConfigurationResults
360 self.logger.info(
"translate_browsepath_to_nodeid")
362 request.Parameters.BrowsePaths = browsepaths
363 data = self._uasocket.send_request(request)
364 response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
365 self.logger.debug(response)
366 response.ResponseHeader.ServiceResult.check()
367 return response.Results
370 self.logger.info(
"create_subscription")
372 request.Parameters = params
375 self._uasocket.send_request(request, mycallbak)
376 return resp_fut.result(self.
_timeout)
379 self.logger.info(
"_create_subscription_callback")
380 data = data_fut.result()
381 response = ua.CreateSubscriptionResponse.from_binary(data)
382 self.logger.debug(response)
383 response.ResponseHeader.ServiceResult.check()
385 resp_fut.set_result(response.Parameters)
388 self.logger.info(
"delete_subscription")
390 request.Parameters.SubscriptionIds = subscriptionids
393 self._uasocket.send_request(request, mycallbak)
394 return resp_fut.result(self.
_timeout)
397 self.logger.info(
"_delete_subscriptions_callback")
398 data = data_fut.result()
399 response = ua.DeleteSubscriptionsResponse.from_binary(data)
400 self.logger.debug(response)
401 response.ResponseHeader.ServiceResult.check()
402 for sid
in subscriptionids:
403 self._publishcallbacks.pop(sid)
404 resp_fut.set_result(response.Results)
407 self.logger.info(
"publish")
411 request.Parameters.SubscriptionAcknowledgements = acks
416 self.logger.info(
"call_publish_callback")
417 data = future.result()
421 self._uasocket.check_answer(data,
"while waiting for publish response")
425 except BadNoSubscription:
439 self.logger.info(
"BadNoSubscription received, ignoring because it's probably valid.")
444 response = ua.PublishResponse.from_binary(data)
445 self.logger.debug(response)
450 self.logger.exception(
"Error parsing notificatipn from server")
458 self.logger.warning(
"Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
463 callback(response.Parameters)
465 self.logger.exception(
"Exception while calling user callback: %s")
468 self.logger.info(
"create_monitored_items")
470 request.Parameters = params
471 data = self._uasocket.send_request(request)
472 response = ua.CreateMonitoredItemsResponse.from_binary(data)
473 self.logger.debug(response)
474 response.ResponseHeader.ServiceResult.check()
475 return response.Results
478 self.logger.info(
"delete_monitored_items")
480 request.Parameters = params
481 data = self._uasocket.send_request(request)
482 response = ua.DeleteMonitoredItemsResponse.from_binary(data)
483 self.logger.debug(response)
484 response.ResponseHeader.ServiceResult.check()
485 return response.Results
488 self.logger.info(
"add_nodes")
490 request.Parameters.NodesToAdd = nodestoadd
491 data = self._uasocket.send_request(request)
492 response = ua.AddNodesResponse.from_binary(data)
493 self.logger.debug(response)
494 response.ResponseHeader.ServiceResult.check()
495 return response.Results
498 self.logger.info(
"delete_nodes")
500 request.Parameters = params
501 data = self._uasocket.send_request(request)
502 response = ua.DeleteNodesResponse.from_binary(data)
503 self.logger.debug(response)
504 response.ResponseHeader.ServiceResult.check()
505 return response.Results
509 request.Parameters.MethodsToCall = methodstocall
510 data = self._uasocket.send_request(request)
511 response = ua.CallResponse.from_binary(data)
512 self.logger.debug(response)
513 response.ResponseHeader.ServiceResult.check()
514 return response.Results
517 self.logger.info(
"history_read")
519 request.Parameters = params
520 data = self._uasocket.send_request(request)
521 response = ua.HistoryReadResponse.from_binary(data)
522 self.logger.debug(response)
523 response.ResponseHeader.ServiceResult.check()
524 return response.Results
527 self.logger.info(
"modify_monitored_items")
529 request.Parameters = params
530 data = self._uasocket.send_request(request)
531 response = ua.ModifyMonitoredItemsResponse.from_binary(data)
532 self.logger.debug(response)
533 response.ResponseHeader.ServiceResult.check()
534 return response.Results
def _create_subscription_callback(self, pub_callback, resp_fut, data_fut)
def check_answer(self, data, context)
def add_nodes(self, nodestoadd)
def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage)
def create_monitored_items(self, params)
def set_security(self, policy)
def find_servers_on_network(self, params)
def publish(self, acks=None)
def register_server2(self, params)
def translate_browsepaths_to_nodeids(self, browsepaths)
def delete_subscriptions(self, subscriptionids)
def __init__(self, timeout=1, security_policy=ua.SecurityPolicy())
def create_subscription(self, params, callback)
def call(self, methodstocall)
def get_endpoints(self, params)
def __init__(self, timeout=1)
def connect_socket(self, host, port)
def history_read(self, params)
def activate_session(self, parameters)
def _create_request_header(self, timeout=1000)
def _call_callback(self, request_id, body)
def delete_nodes(self, params)
def create_session(self, parameters)
def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage)
def open_secure_channel(self, params)
def find_servers(self, params)
def modify_monitored_items(self, params)
def open_secure_channel(self, params)
def _call_publish_callback(self, future)
def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut)
def close_secure_channel(self)
def connect_socket(self, host, port)
def browse(self, parameters)
def send_hello(self, url)
def read(self, parameters)
def delete_monitored_items(self, params)
def send_hello(self, url)
def close_secure_channel(self)
def close_session(self, deletesubscriptions)
def disconnect_socket(self)
def disconnect_socket(self)
def register_server(self, registered_server)