00001 import logging
00002 import sys
00003 import argparse
00004 from datetime import datetime, timedelta
00005 import math
00006 import time
00007
00008 try:
00009 from IPython import embed
00010 except ImportError:
00011 import code
00012
00013 def embed():
00014 code.interact(local=dict(globals(), **locals()))
00015
00016 from opcua import ua
00017 from opcua import Client
00018 from opcua import Server
00019 from opcua import Node
00020 from opcua import uamethod
00021
00022
00023 def add_minimum_args(parser):
00024 parser.add_argument("-u",
00025 "--url",
00026 help="URL of OPC UA server (for example: opc.tcp://example.org:4840)",
00027 default='opc.tcp://localhost:4840',
00028 metavar="URL")
00029 parser.add_argument("-v",
00030 "--verbose",
00031 dest="loglevel",
00032 choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
00033 default='WARNING',
00034 help="Set log level")
00035 parser.add_argument("--timeout",
00036 dest="timeout",
00037 type=int,
00038 default=1,
00039 help="Set socket timeout (NOT the diverse UA timeouts)")
00040
00041
00042 def add_common_args(parser, default_node='i=84'):
00043 add_minimum_args(parser)
00044 parser.add_argument("-n",
00045 "--nodeid",
00046 help="Fully-qualified node ID (for example: i=85). Default: root node",
00047 default=default_node,
00048 metavar="NODE")
00049 parser.add_argument("-p",
00050 "--path",
00051 help="Comma separated browse path to the node starting at NODE (for example: 3:Mybject,3:MyVariable)",
00052 default='',
00053 metavar="BROWSEPATH")
00054 parser.add_argument("-i",
00055 "--namespace",
00056 help="Default namespace",
00057 type=int,
00058 default=0,
00059 metavar="NAMESPACE")
00060 parser.add_argument("--security",
00061 help="Security settings, for example: Basic256,SignAndEncrypt,cert.der,pk.pem[,server_cert.der]. Default: None",
00062 default='')
00063
00064
00065 def _require_nodeid(parser, args):
00066
00067 if args.nodeid == "i=84" and args.path == "":
00068 parser.print_usage()
00069 print("{0}: error: A NodeId or BrowsePath is required".format(parser.prog))
00070 sys.exit(1)
00071
00072
00073 def parse_args(parser, requirenodeid=False):
00074 args = parser.parse_args()
00075 logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
00076 if args.url and '://' not in args.url:
00077 logging.info("Adding default scheme %s to URL %s", ua.OPC_TCP_SCHEME, args.url)
00078 args.url = ua.OPC_TCP_SCHEME + '://' + args.url
00079 if requirenodeid:
00080 _require_nodeid(parser, args)
00081 return args
00082
00083
00084 def get_node(client, args):
00085 node = client.get_node(args.nodeid)
00086 if args.path:
00087 path = args.path.split(",")
00088 if node.nodeid == ua.NodeId(84, 0) and path[0] == "0:Root":
00089
00090 path = path[1:]
00091 node = node.get_child(path)
00092 return node
00093
00094
00095 def uaread():
00096 parser = argparse.ArgumentParser(description="Read attribute of a node, per default reads value of a node")
00097 add_common_args(parser)
00098 parser.add_argument("-a",
00099 "--attribute",
00100 dest="attribute",
00101 type=int,
00102 default=ua.AttributeIds.Value,
00103 help="Set attribute to read")
00104 parser.add_argument("-t",
00105 "--datatype",
00106 dest="datatype",
00107 default="python",
00108 choices=['python', 'variant', 'datavalue'],
00109 help="Data type to return")
00110
00111 args = parse_args(parser, requirenodeid=True)
00112
00113 client = Client(args.url, timeout=args.timeout)
00114 client.set_security_string(args.security)
00115 client.connect()
00116 try:
00117 node = get_node(client, args)
00118 attr = node.get_attribute(args.attribute)
00119 if args.datatype == "python":
00120 print(attr.Value.Value)
00121 elif args.datatype == "variant":
00122 print(attr.Value)
00123 else:
00124 print(attr)
00125 finally:
00126 client.disconnect()
00127 sys.exit(0)
00128 print(args)
00129
00130
00131 def _args_to_array(val, array):
00132 if array == "guess":
00133 if "," in val:
00134 array = "true"
00135 if array == "true":
00136 val = val.split(",")
00137 return val
00138
00139
00140 def _arg_to_bool(val):
00141 return val in ("true", "True")
00142
00143
00144 def _arg_to_variant(val, array, ptype, varianttype=None):
00145 val = _args_to_array(val, array)
00146 if isinstance(val, list):
00147 val = [ptype(i) for i in val]
00148 else:
00149 val = ptype(val)
00150 if varianttype:
00151 return ua.Variant(val, varianttype)
00152 else:
00153 return ua.Variant(val)
00154
00155
00156 def _val_to_variant(val, args):
00157 array = args.array
00158 if args.datatype == "guess":
00159 if val in ("true", "True", "false", "False"):
00160 return _arg_to_variant(val, array, _arg_to_bool)
00161 try:
00162 return _arg_to_variant(val, array, int)
00163 except ValueError:
00164 try:
00165 return _arg_to_variant(val, array, float)
00166 except ValueError:
00167 return _arg_to_variant(val, array, str)
00168 elif args.datatype == "bool":
00169 if val in ("1", "True", "true"):
00170 return ua.Variant(True, ua.VariantType.Boolean)
00171 else:
00172 return ua.Variant(False, ua.VariantType.Boolean)
00173 elif args.datatype == "sbyte":
00174 return _arg_to_variant(val, array, int, ua.VariantType.SByte)
00175 elif args.datatype == "byte":
00176 return _arg_to_variant(val, array, int, ua.VariantType.Byte)
00177
00178
00179 elif args.datatype == "uint16":
00180 return _arg_to_variant(val, array, int, ua.VariantType.UInt16)
00181 elif args.datatype == "uint32":
00182 return _arg_to_variant(val, array, int, ua.VariantType.UInt32)
00183 elif args.datatype == "uint64":
00184 return _arg_to_variant(val, array, int, ua.VariantType.UInt64)
00185
00186
00187 elif args.datatype == "int16":
00188 return _arg_to_variant(val, array, int, ua.VariantType.Int16)
00189 elif args.datatype == "int32":
00190 return _arg_to_variant(val, array, int, ua.VariantType.Int32)
00191 elif args.datatype == "int64":
00192 return _arg_to_variant(val, array, int, ua.VariantType.Int64)
00193 elif args.datatype == "float":
00194 return _arg_to_variant(val, array, float, ua.VariantType.Float)
00195 elif args.datatype == "double":
00196 return _arg_to_variant(val, array, float, ua.VariantType.Double)
00197 elif args.datatype == "string":
00198 return _arg_to_variant(val, array, str, ua.VariantType.String)
00199 elif args.datatype == "datetime":
00200 raise NotImplementedError
00201 elif args.datatype == "Guid":
00202 return _arg_to_variant(val, array, bytes, ua.VariantType.Guid)
00203 elif args.datatype == "ByteString":
00204 return _arg_to_variant(val, array, bytes, ua.VariantType.ByteString)
00205 elif args.datatype == "xml":
00206 return _arg_to_variant(val, array, str, ua.VariantType.XmlElement)
00207 elif args.datatype == "nodeid":
00208 return _arg_to_variant(val, array, ua.NodeId.from_string, ua.VariantType.NodeId)
00209 elif args.datatype == "expandednodeid":
00210 return _arg_to_variant(val, array, ua.ExpandedNodeId.from_string, ua.VariantType.ExpandedNodeId)
00211 elif args.datatype == "statuscode":
00212 return _arg_to_variant(val, array, int, ua.VariantType.StatusCode)
00213 elif args.datatype in ("qualifiedname", "browsename"):
00214 return _arg_to_variant(val, array, ua.QualifiedName.from_string, ua.VariantType.QualifiedName)
00215 elif args.datatype == "LocalizedText":
00216 return _arg_to_variant(val, array, ua.LocalizedText, ua.VariantType.LocalizedText)
00217
00218
00219 def uawrite():
00220 parser = argparse.ArgumentParser(description="Write attribute of a node, per default write value of node")
00221 add_common_args(parser)
00222 parser.add_argument("-a",
00223 "--attribute",
00224 dest="attribute",
00225 type=int,
00226 default=ua.AttributeIds.Value,
00227 help="Set attribute to read")
00228 parser.add_argument("-l",
00229 "--list",
00230 "--array",
00231 dest="array",
00232 default="guess",
00233 choices=["guess", "true", "false"],
00234 help="Value is an array")
00235 parser.add_argument("-t",
00236 "--datatype",
00237 dest="datatype",
00238 default="guess",
00239 choices=["guess", 'byte', 'sbyte', 'nodeid', 'expandednodeid', 'qualifiedname', 'browsename', 'string', 'float', 'double', 'int16', 'int32', "int64", 'uint16', 'uint32', 'uint64', "bool", "string", 'datetime', 'bytestring', 'xmlelement', 'statuscode', 'localizedtext'],
00240 help="Data type to return")
00241 parser.add_argument("value",
00242 help="Value to be written",
00243 metavar="VALUE")
00244 args = parse_args(parser, requirenodeid=True)
00245
00246 client = Client(args.url, timeout=args.timeout)
00247 client.set_security_string(args.security)
00248 client.connect()
00249 try:
00250 node = get_node(client, args)
00251 val = _val_to_variant(args.value, args)
00252 node.set_attribute(args.attribute, ua.DataValue(val))
00253 finally:
00254 client.disconnect()
00255 sys.exit(0)
00256 print(args)
00257
00258
00259 def uals():
00260 parser = argparse.ArgumentParser(description="Browse OPC-UA node and print result")
00261 add_common_args(parser)
00262 parser.add_argument("-l",
00263 dest="long_format",
00264 const=3,
00265 nargs="?",
00266 type=int,
00267 help="use a long listing format")
00268 parser.add_argument("-d",
00269 "--depth",
00270 default=1,
00271 type=int,
00272 help="Browse depth")
00273
00274 args = parse_args(parser)
00275 if args.long_format is None:
00276 args.long_format = 1
00277
00278 client = Client(args.url, timeout=args.timeout)
00279 client.set_security_string(args.security)
00280 client.connect()
00281 try:
00282 node = get_node(client, args)
00283 print("Browsing node {0} at {1}\n".format(node, args.url))
00284 if args.long_format == 0:
00285 _lsprint_0(node, args.depth - 1)
00286 elif args.long_format == 1:
00287 _lsprint_1(node, args.depth - 1)
00288 else:
00289 _lsprint_long(node, args.depth - 1)
00290 finally:
00291 client.disconnect()
00292 sys.exit(0)
00293 print(args)
00294
00295
00296 def _lsprint_0(node, depth, indent=""):
00297 if not indent:
00298 print("{0:30} {1:25}".format("DisplayName", "NodeId"))
00299 print("")
00300 for desc in node.get_children_descriptions():
00301 print("{0}{1:30} {2:25}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string()))
00302 if depth:
00303 _lsprint_0(Node(node.server, desc.NodeId), depth - 1, indent + " ")
00304
00305
00306 def _lsprint_1(node, depth, indent=""):
00307 if not indent:
00308 print("{0:30} {1:25} {2:25} {3:25}".format("DisplayName", "NodeId", "BrowseName", "Value"))
00309 print("")
00310
00311 for desc in node.get_children_descriptions():
00312 if desc.NodeClass == ua.NodeClass.Variable:
00313 val = Node(node.server, desc.NodeId).get_value()
00314 print("{0}{1:30} {2!s:25} {3!s:25}, {4!s:3}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string(), val))
00315 else:
00316 print("{0}{1:30} {2!s:25} {3!s:25}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string()))
00317 if depth:
00318 _lsprint_1(Node(node.server, desc.NodeId), depth - 1, indent + " ")
00319
00320
00321 def _lsprint_long(pnode, depth, indent=""):
00322 if not indent:
00323 print("{0:30} {1:25} {2:25} {3:10} {4:30} {5:25}".format("DisplayName", "NodeId", "BrowseName", "DataType", "Timestamp", "Value"))
00324 print("")
00325 for node in pnode.get_children():
00326 attrs = node.get_attributes([ua.AttributeIds.DisplayName,
00327 ua.AttributeIds.BrowseName,
00328 ua.AttributeIds.NodeClass,
00329 ua.AttributeIds.WriteMask,
00330 ua.AttributeIds.UserWriteMask,
00331 ua.AttributeIds.DataType,
00332 ua.AttributeIds.Value])
00333 name, bname, nclass, mask, umask, dtype, val = [attr.Value.Value for attr in attrs]
00334 update = attrs[-1].ServerTimestamp
00335 if nclass == ua.NodeClass.Variable:
00336 print("{0}{1:30} {2:25} {3:25} {4:10} {5!s:30} {6!s:25}".format(indent, name.to_string(), node.nodeid.to_string(), bname.to_string(), dtype.to_string(), update, val))
00337 else:
00338 print("{0}{1:30} {2:25} {3:25}".format(indent, name.to_string(), bname.to_string(), node.nodeid.to_string()))
00339 if depth:
00340 _lsprint_long(node, depth - 1, indent + " ")
00341
00342
00343 class SubHandler(object):
00344
00345 def datachange_notification(self, node, val, data):
00346 print("New data change event", node, val, data)
00347
00348 def event_notification(self, event):
00349 print("New event", event)
00350
00351
00352 def uasubscribe():
00353 parser = argparse.ArgumentParser(description="Subscribe to a node and print results")
00354 add_common_args(parser)
00355 parser.add_argument("-t",
00356 "--eventtype",
00357 dest="eventtype",
00358 default="datachange",
00359 choices=['datachange', 'event'],
00360 help="Event type to subscribe to")
00361
00362 args = parse_args(parser, requirenodeid=False)
00363 if args.eventtype == "datachange":
00364 _require_nodeid(parser, args)
00365 else:
00366
00367 if args.nodeid == "i=84" and args.path == "":
00368 args.nodeid = "i=2253"
00369
00370 client = Client(args.url, timeout=args.timeout)
00371 client.set_security_string(args.security)
00372 client.connect()
00373 try:
00374 node = get_node(client, args)
00375 handler = SubHandler()
00376 sub = client.create_subscription(500, handler)
00377 if args.eventtype == "datachange":
00378 sub.subscribe_data_change(node)
00379 else:
00380 sub.subscribe_events(node)
00381 print("Type Ctr-C to exit")
00382 while True:
00383 time.sleep(1)
00384 finally:
00385 client.disconnect()
00386 sys.exit(0)
00387 print(args)
00388
00389
00390 def application_to_strings(app):
00391 result = []
00392 result.append(('Application URI', app.ApplicationUri))
00393 optionals = [
00394 ('Product URI', app.ProductUri),
00395 ('Application Name', app.ApplicationName.to_string()),
00396 ('Application Type', str(app.ApplicationType)),
00397 ('Gateway Server URI', app.GatewayServerUri),
00398 ('Discovery Profile URI', app.DiscoveryProfileUri),
00399 ]
00400 for (n, v) in optionals:
00401 if v:
00402 result.append((n, v))
00403 for url in app.DiscoveryUrls:
00404 result.append(('Discovery URL', url))
00405 return result
00406
00407
00408 def cert_to_string(der):
00409 if not der:
00410 return '[no certificate]'
00411 try:
00412 from opcua.crypto import uacrypto
00413 except ImportError:
00414 return "{0} bytes".format(len(der))
00415 cert = uacrypto.x509_from_der(der)
00416 return uacrypto.x509_to_string(cert)
00417
00418
00419 def endpoint_to_strings(ep):
00420 result = [('Endpoint URL', ep.EndpointUrl)]
00421 result += application_to_strings(ep.Server)
00422 result += [
00423 ('Server Certificate', cert_to_string(ep.ServerCertificate)),
00424 ('Security Mode', str(ep.SecurityMode)),
00425 ('Security Policy URI', ep.SecurityPolicyUri)]
00426 for tok in ep.UserIdentityTokens:
00427 result += [
00428 ('User policy', tok.PolicyId),
00429 (' Token type', str(tok.TokenType))]
00430 if tok.IssuedTokenType or tok.IssuerEndpointUrl:
00431 result += [
00432 (' Issued Token type', tok.IssuedTokenType),
00433 (' Issuer Endpoint URL', tok.IssuerEndpointUrl)]
00434 if tok.SecurityPolicyUri:
00435 result.append((' Security Policy URI', tok.SecurityPolicyUri))
00436 result += [
00437 ('Transport Profile URI', ep.TransportProfileUri),
00438 ('Security Level', ep.SecurityLevel)]
00439 return result
00440
00441
00442 def uaclient():
00443 parser = argparse.ArgumentParser(description="Connect to server and start python shell. root and objects nodes are available. Node specificed in command line is available as mynode variable")
00444 add_common_args(parser)
00445 parser.add_argument("-c",
00446 "--certificate",
00447 help="set client certificate")
00448 parser.add_argument("-k",
00449 "--private_key",
00450 help="set client private key")
00451 args = parse_args(parser)
00452
00453 client = Client(args.url, timeout=args.timeout)
00454 client.set_security_string(args.security)
00455 if args.certificate:
00456 client.load_client_certificate(args.certificate)
00457 if args.private_key:
00458 client.load_private_key(args.private_key)
00459 client.connect()
00460 try:
00461 root = client.get_root_node()
00462 objects = client.get_objects_node()
00463 mynode = get_node(client, args)
00464 embed()
00465 finally:
00466 client.disconnect()
00467 sys.exit(0)
00468
00469
00470 def uaserver():
00471 parser = argparse.ArgumentParser(description="Run an example OPC-UA server. By importing xml definition and using uawrite command line, it is even possible to expose real data using this server")
00472
00473 parser.add_argument("-u",
00474 "--url",
00475 help="URL of OPC UA server, default is opc.tcp://0.0.0.0:4840",
00476 default='opc.tcp://0.0.0.0:4840',
00477 metavar="URL")
00478 parser.add_argument("-v",
00479 "--verbose",
00480 dest="loglevel",
00481 choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
00482 default='WARNING',
00483 help="Set log level")
00484 parser.add_argument("-x",
00485 "--xml",
00486 metavar="XML_FILE",
00487 help="Populate address space with nodes defined in XML")
00488 parser.add_argument("-p",
00489 "--populate",
00490 action="store_true",
00491 help="Populate address space with some sample nodes")
00492 parser.add_argument("-c",
00493 "--disable-clock",
00494 action="store_true",
00495 help="Disable clock, to avoid seeing many write if debugging an application")
00496 parser.add_argument("-s",
00497 "--shell",
00498 action="store_true",
00499 help="Start python shell instead of randomly changing node values")
00500 parser.add_argument("--certificate",
00501 help="set server certificate")
00502 parser.add_argument("--private_key",
00503 help="set server private key")
00504 args = parser.parse_args()
00505 logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
00506
00507 server = Server()
00508 server.set_endpoint(args.url)
00509 if args.certificate:
00510 server.load_certificate(args.certificate)
00511 if args.private_key:
00512 server.load_private_key(args.private_key)
00513 server.disable_clock(args.disable_clock)
00514 server.set_server_name("FreeOpcUa Example Server")
00515 if args.xml:
00516 server.import_xml(args.xml)
00517 if args.populate:
00518 @uamethod
00519 def multiply(parent, x, y):
00520 print("multiply method call with parameters: ", x, y)
00521 return x * y
00522
00523 uri = "http://examples.freeopcua.github.io"
00524 idx = server.register_namespace(uri)
00525 objects = server.get_objects_node()
00526 myobj = objects.add_object(idx, "MyObject")
00527 mywritablevar = myobj.add_variable(idx, "MyWritableVariable", 6.7)
00528 mywritablevar.set_writable()
00529 myvar = myobj.add_variable(idx, "MyVariable", 6.7)
00530 myarrayvar = myobj.add_variable(idx, "MyVarArray", [6.7, 7.9])
00531 myprop = myobj.add_property(idx, "MyProperty", "I am a property")
00532 mymethod = myobj.add_method(idx, "MyMethod", multiply, [ua.VariantType.Double, ua.VariantType.Int64], [ua.VariantType.Double])
00533
00534 server.start()
00535 try:
00536 if args.shell:
00537 embed()
00538 elif args.populate:
00539 count = 0
00540 while True:
00541 time.sleep(1)
00542 myvar.set_value(math.sin(count / 10))
00543 myarrayvar.set_value([math.sin(count / 10), math.sin(count / 100)])
00544 count += 1
00545 else:
00546 while True:
00547 time.sleep(1)
00548 finally:
00549 server.stop()
00550 sys.exit(0)
00551
00552
00553 def uadiscover():
00554 parser = argparse.ArgumentParser(description="Performs OPC UA discovery and prints information on servers and endpoints.")
00555 add_minimum_args(parser)
00556 parser.add_argument("-n",
00557 "--network",
00558 action="store_true",
00559 help="Also send a FindServersOnNetwork request to server")
00560
00561
00562
00563
00564
00565
00566
00567
00568 args = parse_args(parser)
00569
00570 client = Client(args.url, timeout=args.timeout)
00571
00572 if args.network:
00573 print("Performing discovery at {0}\n".format(args.url))
00574 for i, server in enumerate(client.connect_and_find_servers_on_network(), start=1):
00575 print('Server {0}:'.format(i))
00576
00577
00578 print('')
00579
00580 print("Performing discovery at {0}\n".format(args.url))
00581 for i, server in enumerate(client.connect_and_find_servers(), start=1):
00582 print('Server {0}:'.format(i))
00583 for (n, v) in application_to_strings(server):
00584 print(' {0}: {1}'.format(n, v))
00585 print('')
00586
00587 for i, ep in enumerate(client.connect_and_get_server_endpoints(), start=1):
00588 print('Endpoint {0}:'.format(i))
00589 for (n, v) in endpoint_to_strings(ep):
00590 print(' {0}: {1}'.format(n, v))
00591 print('')
00592
00593 sys.exit(0)
00594
00595
00596 def print_history(o):
00597 if isinstance(o, ua.HistoryData):
00598 print("{0:30} {1:10} {2}".format('Source timestamp', 'Status', 'Value'))
00599 for d in o.DataValues:
00600 print("{0:30} {1:10} {2}".format(str(d.SourceTimestamp), d.StatusCode.name, d.Value))
00601
00602
00603 def str_to_datetime(s, default=None):
00604 if not s:
00605 if default is not None:
00606 return default
00607 return datetime.utcnow()
00608
00609 for fmt in ["%Y-%m-%d", "%Y-%m-%d %H:%M", "%Y-%m-%d %H:%M:%S"]:
00610 try:
00611 return datetime.strptime(s, fmt)
00612 except ValueError:
00613 pass
00614
00615
00616 def uahistoryread():
00617 parser = argparse.ArgumentParser(description="Read history of a node")
00618 add_common_args(parser)
00619 parser.add_argument("--starttime",
00620 default=None,
00621 help="Start time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time - one day")
00622 parser.add_argument("--endtime",
00623 default=None,
00624 help="End time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time")
00625 parser.add_argument("-e",
00626 "--events",
00627 action="store_true",
00628 help="Read event history instead of data change history")
00629 parser.add_argument("-l",
00630 "--limit",
00631 type=int,
00632 default=10,
00633 help="Maximum number of notfication to return")
00634
00635 args = parse_args(parser, requirenodeid=True)
00636
00637 client = Client(args.url, timeout=args.timeout)
00638 client.set_security_string(args.security)
00639 client.connect()
00640 try:
00641 node = get_node(client, args)
00642 starttime = str_to_datetime(args.starttime, datetime.utcnow() - timedelta(days=1))
00643 endtime = str_to_datetime(args.endtime, datetime.utcnow())
00644 print("Reading raw history of node {0} at {1}; start at {2}, end at {3}\n".format(node, args.url, starttime, endtime))
00645 if args.events:
00646 evs = node.read_event_history(starttime, endtime, numvalues=args.limit)
00647 for ev in evs:
00648 print(ev)
00649 else:
00650 print_history(node.read_raw_history(starttime, endtime, numvalues=args.limit))
00651 finally:
00652 client.disconnect()
00653 sys.exit(0)
00654
00655
00656 def uacall():
00657 parser = argparse.ArgumentParser(description="Call method of a node")
00658 add_common_args(parser)
00659 parser.add_argument("-m",
00660 "--method",
00661 dest="method",
00662 type=int,
00663 default=None,
00664 help="Set method to call. If not given then (single) method of the selected node is used.")
00665 parser.add_argument("-l",
00666 "--list",
00667 "--array",
00668 dest="array",
00669 default="guess",
00670 choices=["guess", "true", "false"],
00671 help="Value is an array")
00672 parser.add_argument("-t",
00673 "--datatype",
00674 dest="datatype",
00675 default="guess",
00676 choices=["guess", 'byte', 'sbyte', 'nodeid', 'expandednodeid', 'qualifiedname', 'browsename', 'string', 'float', 'double', 'int16', 'int32', "int64", 'uint16', 'uint32', 'uint64', "bool", "string", 'datetime', 'bytestring', 'xmlelement', 'statuscode', 'localizedtext'],
00677 help="Data type to return")
00678 parser.add_argument("value",
00679 help="Value to use for call to method, if any",
00680 nargs="?",
00681 metavar="VALUE")
00682
00683 args = parse_args(parser, requirenodeid=True)
00684
00685 client = Client(args.url, timeout=args.timeout)
00686 client.set_security_string(args.security)
00687 client.connect()
00688 try:
00689 node = get_node(client, args)
00690
00691 if ( args.value is None ):
00692 val = ()
00693 else:
00694 val = (_val_to_variant(args.value, args),)
00695
00696
00697 methods = node.get_methods()
00698 method_id = None
00699
00700
00701 if ( args.method is None ):
00702 if ( len( methods ) == 0 ):
00703 raise ValueError( "No methods in selected node and no method given" )
00704 elif ( len( methods ) == 1 ):
00705 method_id = methods[0]
00706 else:
00707 raise ValueError( "Selected node has {0:d} methods but no method given. Provide one of {1!s}".format(*(methods)) )
00708 else:
00709 for m in methods:
00710 if ( m.nodeid.Identifier == args.method ):
00711 method_id = m.nodeid
00712 break
00713
00714 if ( method_id is None):
00715
00716 method_id = ua.NodeId( identifier=args.method )
00717
00718
00719
00720 result_variants = node.call_method( method_id, *val )
00721 print( "resulting result_variants={0!s}".format(result_variants) )
00722 finally:
00723 client.disconnect()
00724 sys.exit(0)
00725 print(args)