tools.py
Go to the documentation of this file.
00001 import logging
00002 import sys
00003 import argparse
00004 from datetime import datetime, timedelta
00005 import math
00006 import time
00007 
00008 try:
00009     from IPython import embed
00010 except ImportError:
00011     import code
00012 
00013     def embed():
00014         code.interact(local=dict(globals(), **locals()))
00015 
00016 from opcua import ua
00017 from opcua import Client
00018 from opcua import Server
00019 from opcua import Node
00020 from opcua import uamethod
00021 
00022 
00023 def add_minimum_args(parser):
00024     parser.add_argument("-u",
00025                         "--url",
00026                         help="URL of OPC UA server (for example: opc.tcp://example.org:4840)",
00027                         default='opc.tcp://localhost:4840',
00028                         metavar="URL")
00029     parser.add_argument("-v",
00030                         "--verbose",
00031                         dest="loglevel",
00032                         choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
00033                         default='WARNING',
00034                         help="Set log level")
00035     parser.add_argument("--timeout",
00036                         dest="timeout",
00037                         type=int,
00038                         default=1,
00039                         help="Set socket timeout (NOT the diverse UA timeouts)")
00040 
00041 
00042 def add_common_args(parser, default_node='i=84'):
00043     add_minimum_args(parser)
00044     parser.add_argument("-n",
00045                         "--nodeid",
00046                         help="Fully-qualified node ID (for example: i=85). Default: root node",
00047                         default=default_node,
00048                         metavar="NODE")
00049     parser.add_argument("-p",
00050                         "--path",
00051                         help="Comma separated browse path to the node starting at NODE (for example: 3:Mybject,3:MyVariable)",
00052                         default='',
00053                         metavar="BROWSEPATH")
00054     parser.add_argument("-i",
00055                         "--namespace",
00056                         help="Default namespace",
00057                         type=int,
00058                         default=0,
00059                         metavar="NAMESPACE")
00060     parser.add_argument("--security",
00061                         help="Security settings, for example: Basic256,SignAndEncrypt,cert.der,pk.pem[,server_cert.der]. Default: None",
00062                         default='')
00063 
00064 
00065 def _require_nodeid(parser, args):
00066     # check that a nodeid has been given explicitly, a bit hackish...
00067     if args.nodeid == "i=84" and args.path == "":
00068         parser.print_usage()
00069         print("{0}: error: A NodeId or BrowsePath is required".format(parser.prog))
00070         sys.exit(1)
00071 
00072 
00073 def parse_args(parser, requirenodeid=False):
00074     args = parser.parse_args()
00075     logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
00076     if args.url and '://' not in args.url:
00077         logging.info("Adding default scheme %s to URL %s", ua.OPC_TCP_SCHEME, args.url)
00078         args.url = ua.OPC_TCP_SCHEME + '://' + args.url
00079     if requirenodeid:
00080         _require_nodeid(parser, args)
00081     return args
00082 
00083 
00084 def get_node(client, args):
00085     node = client.get_node(args.nodeid)
00086     if args.path:
00087         path = args.path.split(",")
00088         if node.nodeid == ua.NodeId(84, 0) and path[0] == "0:Root":
00089             # let user specify root if not node given
00090             path = path[1:]
00091         node = node.get_child(path)
00092     return node
00093 
00094 
00095 def uaread():
00096     parser = argparse.ArgumentParser(description="Read attribute of a node, per default reads value of a node")
00097     add_common_args(parser)
00098     parser.add_argument("-a",
00099                         "--attribute",
00100                         dest="attribute",
00101                         type=int,
00102                         default=ua.AttributeIds.Value,
00103                         help="Set attribute to read")
00104     parser.add_argument("-t",
00105                         "--datatype",
00106                         dest="datatype",
00107                         default="python",
00108                         choices=['python', 'variant', 'datavalue'],
00109                         help="Data type to return")
00110 
00111     args = parse_args(parser, requirenodeid=True)
00112 
00113     client = Client(args.url, timeout=args.timeout)
00114     client.set_security_string(args.security)
00115     client.connect()
00116     try:
00117         node = get_node(client, args)
00118         attr = node.get_attribute(args.attribute)
00119         if args.datatype == "python":
00120             print(attr.Value.Value)
00121         elif args.datatype == "variant":
00122             print(attr.Value)
00123         else:
00124             print(attr)
00125     finally:
00126         client.disconnect()
00127     sys.exit(0)
00128     print(args)
00129 
00130 
00131 def _args_to_array(val, array):
00132     if array == "guess":
00133         if "," in val:
00134             array = "true"
00135     if array == "true":
00136         val = val.split(",")
00137     return val
00138 
00139 
00140 def _arg_to_bool(val):
00141     return val in ("true", "True")
00142 
00143 
00144 def _arg_to_variant(val, array, ptype, varianttype=None):
00145     val = _args_to_array(val, array)
00146     if isinstance(val, list):
00147         val = [ptype(i) for i in val]
00148     else:
00149         val = ptype(val)
00150     if varianttype:
00151         return ua.Variant(val, varianttype)
00152     else:
00153         return ua.Variant(val)
00154 
00155 
00156 def _val_to_variant(val, args):
00157     array = args.array
00158     if args.datatype == "guess":
00159         if val in ("true", "True", "false", "False"):
00160             return _arg_to_variant(val, array, _arg_to_bool)
00161         try:
00162             return _arg_to_variant(val, array, int)
00163         except ValueError:
00164             try:
00165                 return _arg_to_variant(val, array, float)
00166             except ValueError:
00167                 return _arg_to_variant(val, array, str)
00168     elif args.datatype == "bool":
00169         if val in ("1", "True", "true"):
00170             return ua.Variant(True, ua.VariantType.Boolean)
00171         else:
00172             return ua.Variant(False, ua.VariantType.Boolean)
00173     elif args.datatype == "sbyte":
00174         return _arg_to_variant(val, array, int, ua.VariantType.SByte)
00175     elif args.datatype == "byte":
00176         return _arg_to_variant(val, array, int, ua.VariantType.Byte)
00177     #elif args.datatype == "uint8":
00178         #return _arg_to_variant(val, array, int, ua.VariantType.Byte)
00179     elif args.datatype == "uint16":
00180         return _arg_to_variant(val, array, int, ua.VariantType.UInt16)
00181     elif args.datatype == "uint32":
00182         return _arg_to_variant(val, array, int, ua.VariantType.UInt32)
00183     elif args.datatype == "uint64":
00184         return _arg_to_variant(val, array, int, ua.VariantType.UInt64)
00185     #elif args.datatype == "int8":
00186         #return ua.Variant(int(val), ua.VariantType.Int8)
00187     elif args.datatype == "int16":
00188         return _arg_to_variant(val, array, int, ua.VariantType.Int16)
00189     elif args.datatype == "int32":
00190         return _arg_to_variant(val, array, int, ua.VariantType.Int32)
00191     elif args.datatype == "int64":
00192         return _arg_to_variant(val, array, int, ua.VariantType.Int64)
00193     elif args.datatype == "float":
00194         return _arg_to_variant(val, array, float, ua.VariantType.Float)
00195     elif args.datatype == "double":
00196         return _arg_to_variant(val, array, float, ua.VariantType.Double)
00197     elif args.datatype == "string":
00198         return _arg_to_variant(val, array, str, ua.VariantType.String)
00199     elif args.datatype == "datetime":
00200         raise NotImplementedError
00201     elif args.datatype == "Guid":
00202         return _arg_to_variant(val, array, bytes, ua.VariantType.Guid)
00203     elif args.datatype == "ByteString":
00204         return _arg_to_variant(val, array, bytes, ua.VariantType.ByteString)
00205     elif args.datatype == "xml":
00206         return _arg_to_variant(val, array, str, ua.VariantType.XmlElement)
00207     elif args.datatype == "nodeid":
00208         return _arg_to_variant(val, array, ua.NodeId.from_string, ua.VariantType.NodeId)
00209     elif args.datatype == "expandednodeid":
00210         return _arg_to_variant(val, array, ua.ExpandedNodeId.from_string, ua.VariantType.ExpandedNodeId)
00211     elif args.datatype == "statuscode":
00212         return _arg_to_variant(val, array, int, ua.VariantType.StatusCode)
00213     elif args.datatype in ("qualifiedname", "browsename"):
00214         return _arg_to_variant(val, array, ua.QualifiedName.from_string, ua.VariantType.QualifiedName)
00215     elif args.datatype == "LocalizedText":
00216         return _arg_to_variant(val, array, ua.LocalizedText, ua.VariantType.LocalizedText)
00217 
00218 
00219 def uawrite():
00220     parser = argparse.ArgumentParser(description="Write attribute of a node, per default write value of node")
00221     add_common_args(parser)
00222     parser.add_argument("-a",
00223                         "--attribute",
00224                         dest="attribute",
00225                         type=int,
00226                         default=ua.AttributeIds.Value,
00227                         help="Set attribute to read")
00228     parser.add_argument("-l",
00229                         "--list",
00230                         "--array",
00231                         dest="array",
00232                         default="guess",
00233                         choices=["guess", "true", "false"],
00234                         help="Value is an array")
00235     parser.add_argument("-t",
00236                         "--datatype",
00237                         dest="datatype",
00238                         default="guess",
00239                         choices=["guess", 'byte', 'sbyte', 'nodeid', 'expandednodeid', 'qualifiedname', 'browsename', 'string', 'float', 'double', 'int16', 'int32', "int64", 'uint16', 'uint32', 'uint64', "bool", "string", 'datetime', 'bytestring', 'xmlelement', 'statuscode', 'localizedtext'],
00240                         help="Data type to return")
00241     parser.add_argument("value",
00242                         help="Value to be written",
00243                         metavar="VALUE")
00244     args = parse_args(parser, requirenodeid=True)
00245 
00246     client = Client(args.url, timeout=args.timeout)
00247     client.set_security_string(args.security)
00248     client.connect()
00249     try:
00250         node = get_node(client, args)
00251         val = _val_to_variant(args.value, args)
00252         node.set_attribute(args.attribute, ua.DataValue(val))
00253     finally:
00254         client.disconnect()
00255     sys.exit(0)
00256     print(args)
00257 
00258 
00259 def uals():
00260     parser = argparse.ArgumentParser(description="Browse OPC-UA node and print result")
00261     add_common_args(parser)
00262     parser.add_argument("-l",
00263                         dest="long_format",
00264                         const=3,
00265                         nargs="?",
00266                         type=int,
00267                         help="use a long listing format")
00268     parser.add_argument("-d",
00269                         "--depth",
00270                         default=1,
00271                         type=int,
00272                         help="Browse depth")
00273 
00274     args = parse_args(parser)
00275     if args.long_format is None:
00276         args.long_format = 1
00277 
00278     client = Client(args.url, timeout=args.timeout)
00279     client.set_security_string(args.security)
00280     client.connect()
00281     try:
00282         node = get_node(client, args)
00283         print("Browsing node {0} at {1}\n".format(node, args.url))
00284         if args.long_format == 0:
00285             _lsprint_0(node, args.depth - 1)
00286         elif args.long_format == 1:
00287             _lsprint_1(node, args.depth - 1)
00288         else:
00289             _lsprint_long(node, args.depth - 1)
00290     finally:
00291         client.disconnect()
00292     sys.exit(0)
00293     print(args)
00294 
00295 
00296 def _lsprint_0(node, depth, indent=""):
00297     if not indent:
00298         print("{0:30} {1:25}".format("DisplayName", "NodeId"))
00299         print("")
00300     for desc in node.get_children_descriptions():
00301         print("{0}{1:30} {2:25}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string()))
00302         if depth:
00303             _lsprint_0(Node(node.server, desc.NodeId), depth - 1, indent + "  ")
00304 
00305 
00306 def _lsprint_1(node, depth, indent=""):
00307     if not indent:
00308         print("{0:30} {1:25} {2:25} {3:25}".format("DisplayName", "NodeId", "BrowseName", "Value"))
00309         print("")
00310 
00311     for desc in node.get_children_descriptions():
00312         if desc.NodeClass == ua.NodeClass.Variable:
00313             val = Node(node.server, desc.NodeId).get_value()
00314             print("{0}{1:30} {2!s:25} {3!s:25}, {4!s:3}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string(), val))
00315         else:
00316             print("{0}{1:30} {2!s:25} {3!s:25}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string()))
00317         if depth:
00318             _lsprint_1(Node(node.server, desc.NodeId), depth - 1, indent + "  ")
00319 
00320 
00321 def _lsprint_long(pnode, depth, indent=""):
00322     if not indent:
00323         print("{0:30} {1:25} {2:25} {3:10} {4:30} {5:25}".format("DisplayName", "NodeId", "BrowseName", "DataType", "Timestamp", "Value"))
00324         print("")
00325     for node in pnode.get_children():
00326         attrs = node.get_attributes([ua.AttributeIds.DisplayName,
00327                                      ua.AttributeIds.BrowseName,
00328                                      ua.AttributeIds.NodeClass,
00329                                      ua.AttributeIds.WriteMask,
00330                                      ua.AttributeIds.UserWriteMask,
00331                                      ua.AttributeIds.DataType,
00332                                      ua.AttributeIds.Value])
00333         name, bname, nclass, mask, umask, dtype, val = [attr.Value.Value for attr in attrs]
00334         update = attrs[-1].ServerTimestamp
00335         if nclass == ua.NodeClass.Variable:
00336             print("{0}{1:30} {2:25} {3:25} {4:10} {5!s:30} {6!s:25}".format(indent, name.to_string(), node.nodeid.to_string(), bname.to_string(), dtype.to_string(), update, val))
00337         else:
00338             print("{0}{1:30} {2:25} {3:25}".format(indent, name.to_string(), bname.to_string(), node.nodeid.to_string()))
00339         if depth:
00340             _lsprint_long(node, depth - 1, indent + "  ")
00341 
00342 
00343 class SubHandler(object):
00344 
00345     def datachange_notification(self, node, val, data):
00346         print("New data change event", node, val, data)
00347 
00348     def event_notification(self, event):
00349         print("New event", event)
00350 
00351 
00352 def uasubscribe():
00353     parser = argparse.ArgumentParser(description="Subscribe to a node and print results")
00354     add_common_args(parser)
00355     parser.add_argument("-t",
00356                         "--eventtype",
00357                         dest="eventtype",
00358                         default="datachange",
00359                         choices=['datachange', 'event'],
00360                         help="Event type to subscribe to")
00361 
00362     args = parse_args(parser, requirenodeid=False)
00363     if args.eventtype == "datachange":
00364         _require_nodeid(parser, args)
00365     else:
00366         # FIXME: this is broken, someone may have written i=84 on purpose
00367         if args.nodeid == "i=84" and args.path == "":
00368             args.nodeid = "i=2253"
00369 
00370     client = Client(args.url, timeout=args.timeout)
00371     client.set_security_string(args.security)
00372     client.connect()
00373     try:
00374         node = get_node(client, args)
00375         handler = SubHandler()
00376         sub = client.create_subscription(500, handler)
00377         if args.eventtype == "datachange":
00378             sub.subscribe_data_change(node)
00379         else:
00380             sub.subscribe_events(node)
00381         print("Type Ctr-C to exit")
00382         while True:
00383             time.sleep(1)
00384     finally:
00385         client.disconnect()
00386     sys.exit(0)
00387     print(args)
00388 
00389 
00390 def application_to_strings(app):
00391     result = []
00392     result.append(('Application URI', app.ApplicationUri))
00393     optionals = [
00394         ('Product URI', app.ProductUri),
00395         ('Application Name', app.ApplicationName.to_string()),
00396         ('Application Type', str(app.ApplicationType)),
00397         ('Gateway Server URI', app.GatewayServerUri),
00398         ('Discovery Profile URI', app.DiscoveryProfileUri),
00399     ]
00400     for (n, v) in optionals:
00401         if v:
00402             result.append((n, v))
00403     for url in app.DiscoveryUrls:
00404         result.append(('Discovery URL', url))
00405     return result  # ['{}: {}'.format(n, v) for (n, v) in result]
00406 
00407 
00408 def cert_to_string(der):
00409     if not der:
00410         return '[no certificate]'
00411     try:
00412         from opcua.crypto import uacrypto
00413     except ImportError:
00414         return "{0} bytes".format(len(der))
00415     cert = uacrypto.x509_from_der(der)
00416     return uacrypto.x509_to_string(cert)
00417 
00418 
00419 def endpoint_to_strings(ep):
00420     result = [('Endpoint URL', ep.EndpointUrl)]
00421     result += application_to_strings(ep.Server)
00422     result += [
00423         ('Server Certificate', cert_to_string(ep.ServerCertificate)),
00424         ('Security Mode', str(ep.SecurityMode)),
00425         ('Security Policy URI', ep.SecurityPolicyUri)]
00426     for tok in ep.UserIdentityTokens:
00427         result += [
00428             ('User policy', tok.PolicyId),
00429             ('  Token type', str(tok.TokenType))]
00430         if tok.IssuedTokenType or tok.IssuerEndpointUrl:
00431             result += [
00432                 ('  Issued Token type', tok.IssuedTokenType),
00433                 ('  Issuer Endpoint URL', tok.IssuerEndpointUrl)]
00434         if tok.SecurityPolicyUri:
00435             result.append(('  Security Policy URI', tok.SecurityPolicyUri))
00436     result += [
00437         ('Transport Profile URI', ep.TransportProfileUri),
00438         ('Security Level', ep.SecurityLevel)]
00439     return result
00440 
00441 
00442 def uaclient():
00443     parser = argparse.ArgumentParser(description="Connect to server and start python shell. root and objects nodes are available. Node specificed in command line is available as mynode variable")
00444     add_common_args(parser)
00445     parser.add_argument("-c",
00446                         "--certificate",
00447                         help="set client certificate")
00448     parser.add_argument("-k",
00449                         "--private_key",
00450                         help="set client private key")
00451     args = parse_args(parser)
00452 
00453     client = Client(args.url, timeout=args.timeout)
00454     client.set_security_string(args.security)
00455     if args.certificate:
00456         client.load_client_certificate(args.certificate)
00457     if args.private_key:
00458         client.load_private_key(args.private_key)
00459     client.connect()
00460     try:
00461         root = client.get_root_node()
00462         objects = client.get_objects_node()
00463         mynode = get_node(client, args)
00464         embed()
00465     finally:
00466         client.disconnect()
00467     sys.exit(0)
00468 
00469 
00470 def uaserver():
00471     parser = argparse.ArgumentParser(description="Run an example OPC-UA server. By importing xml definition and using uawrite command line, it is even possible to expose real data using this server")
00472     # we setup a server, this is a bit different from other tool so we do not reuse common arguments
00473     parser.add_argument("-u",
00474                         "--url",
00475                         help="URL of OPC UA server, default is opc.tcp://0.0.0.0:4840",
00476                         default='opc.tcp://0.0.0.0:4840',
00477                         metavar="URL")
00478     parser.add_argument("-v",
00479                         "--verbose",
00480                         dest="loglevel",
00481                         choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
00482                         default='WARNING',
00483                         help="Set log level")
00484     parser.add_argument("-x",
00485                         "--xml",
00486                         metavar="XML_FILE",
00487                         help="Populate address space with nodes defined in XML")
00488     parser.add_argument("-p",
00489                         "--populate",
00490                         action="store_true",
00491                         help="Populate address space with some sample nodes")
00492     parser.add_argument("-c",
00493                         "--disable-clock",
00494                         action="store_true",
00495                         help="Disable clock, to avoid seeing many write if debugging an application")
00496     parser.add_argument("-s",
00497                         "--shell",
00498                         action="store_true",
00499                         help="Start python shell instead of randomly changing node values")
00500     parser.add_argument("--certificate",
00501                         help="set server certificate")
00502     parser.add_argument("--private_key",
00503                         help="set server private key")
00504     args = parser.parse_args()
00505     logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
00506 
00507     server = Server()
00508     server.set_endpoint(args.url)
00509     if args.certificate:
00510         server.load_certificate(args.certificate)
00511     if args.private_key:
00512         server.load_private_key(args.private_key)
00513     server.disable_clock(args.disable_clock)
00514     server.set_server_name("FreeOpcUa Example Server")
00515     if args.xml:
00516         server.import_xml(args.xml)
00517     if args.populate:
00518         @uamethod
00519         def multiply(parent, x, y):
00520             print("multiply method call with parameters: ", x, y)
00521             return x * y
00522 
00523         uri = "http://examples.freeopcua.github.io"
00524         idx = server.register_namespace(uri)
00525         objects = server.get_objects_node()
00526         myobj = objects.add_object(idx, "MyObject")
00527         mywritablevar = myobj.add_variable(idx, "MyWritableVariable", 6.7)
00528         mywritablevar.set_writable()    # Set MyVariable to be writable by clients
00529         myvar = myobj.add_variable(idx, "MyVariable", 6.7)
00530         myarrayvar = myobj.add_variable(idx, "MyVarArray", [6.7, 7.9])
00531         myprop = myobj.add_property(idx, "MyProperty", "I am a property")
00532         mymethod = myobj.add_method(idx, "MyMethod", multiply, [ua.VariantType.Double, ua.VariantType.Int64], [ua.VariantType.Double])
00533 
00534     server.start()
00535     try:
00536         if args.shell:
00537             embed()
00538         elif args.populate:
00539             count = 0
00540             while True:
00541                 time.sleep(1)
00542                 myvar.set_value(math.sin(count / 10))
00543                 myarrayvar.set_value([math.sin(count / 10), math.sin(count / 100)])
00544                 count += 1
00545         else:
00546             while True:
00547                 time.sleep(1)
00548     finally:
00549         server.stop()
00550     sys.exit(0)
00551 
00552 
00553 def uadiscover():
00554     parser = argparse.ArgumentParser(description="Performs OPC UA discovery and prints information on servers and endpoints.")
00555     add_minimum_args(parser)
00556     parser.add_argument("-n",
00557                         "--network",
00558                         action="store_true",
00559                         help="Also send a FindServersOnNetwork request to server")
00560     #parser.add_argument("-s",
00561                         #"--servers",
00562                         #action="store_false",
00563                         #help="send a FindServers request to server")
00564     #parser.add_argument("-e",
00565                         #"--endpoints",
00566                         #action="store_false",
00567                         #help="send a GetEndpoints request to server")
00568     args = parse_args(parser)
00569 
00570     client = Client(args.url, timeout=args.timeout)
00571 
00572     if args.network:
00573         print("Performing discovery at {0}\n".format(args.url))
00574         for i, server in enumerate(client.connect_and_find_servers_on_network(), start=1):
00575             print('Server {0}:'.format(i))
00576             #for (n, v) in application_to_strings(server):
00577                 #print('  {}: {}'.format(n, v))
00578             print('')
00579 
00580     print("Performing discovery at {0}\n".format(args.url))
00581     for i, server in enumerate(client.connect_and_find_servers(), start=1):
00582         print('Server {0}:'.format(i))
00583         for (n, v) in application_to_strings(server):
00584             print('  {0}: {1}'.format(n, v))
00585         print('')
00586 
00587     for i, ep in enumerate(client.connect_and_get_server_endpoints(), start=1):
00588         print('Endpoint {0}:'.format(i))
00589         for (n, v) in endpoint_to_strings(ep):
00590             print('  {0}: {1}'.format(n, v))
00591         print('')
00592 
00593     sys.exit(0)
00594 
00595 
00596 def print_history(o):
00597     if isinstance(o, ua.HistoryData):
00598         print("{0:30} {1:10} {2}".format('Source timestamp', 'Status', 'Value'))
00599         for d in o.DataValues:
00600             print("{0:30} {1:10} {2}".format(str(d.SourceTimestamp), d.StatusCode.name, d.Value))
00601 
00602 
00603 def str_to_datetime(s, default=None):
00604     if not s:
00605         if default is not None:
00606             return default
00607         return datetime.utcnow()
00608     # FIXME: try different datetime formats
00609     for fmt in ["%Y-%m-%d", "%Y-%m-%d %H:%M", "%Y-%m-%d %H:%M:%S"]:
00610         try:
00611             return datetime.strptime(s, fmt)
00612         except ValueError:
00613             pass
00614 
00615 
00616 def uahistoryread():
00617     parser = argparse.ArgumentParser(description="Read history of a node")
00618     add_common_args(parser)
00619     parser.add_argument("--starttime",
00620                         default=None,
00621                         help="Start time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time - one day")
00622     parser.add_argument("--endtime",
00623                         default=None,
00624                         help="End time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time")
00625     parser.add_argument("-e",
00626                         "--events",
00627                         action="store_true",
00628                         help="Read event history instead of data change history")
00629     parser.add_argument("-l",
00630                         "--limit",
00631                         type=int,
00632                         default=10,
00633                         help="Maximum number of notfication to return")
00634 
00635     args = parse_args(parser, requirenodeid=True)
00636 
00637     client = Client(args.url, timeout=args.timeout)
00638     client.set_security_string(args.security)
00639     client.connect()
00640     try:
00641         node = get_node(client, args)
00642         starttime = str_to_datetime(args.starttime, datetime.utcnow() - timedelta(days=1))
00643         endtime = str_to_datetime(args.endtime, datetime.utcnow())
00644         print("Reading raw history of node {0} at {1}; start at {2}, end at {3}\n".format(node, args.url, starttime, endtime))
00645         if args.events:
00646             evs = node.read_event_history(starttime, endtime, numvalues=args.limit)
00647             for ev in evs:
00648                 print(ev)
00649         else:
00650             print_history(node.read_raw_history(starttime, endtime, numvalues=args.limit))
00651     finally:
00652         client.disconnect()
00653     sys.exit(0)
00654 
00655 
00656 def uacall():
00657     parser = argparse.ArgumentParser(description="Call method of a node")
00658     add_common_args(parser)
00659     parser.add_argument("-m",
00660                         "--method",
00661                         dest="method",
00662                         type=int,
00663                         default=None,
00664                         help="Set method to call. If not given then (single) method of the selected node is used.")
00665     parser.add_argument("-l",
00666                         "--list",
00667                         "--array",
00668                         dest="array",
00669                         default="guess",
00670                         choices=["guess", "true", "false"],
00671                         help="Value is an array")
00672     parser.add_argument("-t",
00673                         "--datatype",
00674                         dest="datatype",
00675                         default="guess",
00676                         choices=["guess", 'byte', 'sbyte', 'nodeid', 'expandednodeid', 'qualifiedname', 'browsename', 'string', 'float', 'double', 'int16', 'int32', "int64", 'uint16', 'uint32', 'uint64', "bool", "string", 'datetime', 'bytestring', 'xmlelement', 'statuscode', 'localizedtext'],
00677                         help="Data type to return")
00678     parser.add_argument("value",
00679                         help="Value to use for call to method, if any",
00680                         nargs="?",
00681                         metavar="VALUE")
00682 
00683     args = parse_args(parser, requirenodeid=True)
00684 
00685     client = Client(args.url, timeout=args.timeout)
00686     client.set_security_string(args.security)
00687     client.connect()
00688     try:
00689         node = get_node(client, args)
00690         # val must be a tuple in order to enable method calls without arguments
00691         if ( args.value is None ):
00692             val = () #empty tuple
00693         else:
00694             val = (_val_to_variant(args.value, args),) # tuple with one element
00695 
00696         # determine method to call: Either explicitly given or automatically select the method of the selected node.
00697         methods = node.get_methods()
00698         method_id = None
00699         #print( "methods=%s" % (methods) )
00700 
00701         if ( args.method is None ):
00702             if ( len( methods ) == 0 ):
00703                 raise ValueError( "No methods in selected node and no method given" )
00704             elif ( len( methods ) == 1 ):
00705                 method_id = methods[0]
00706             else:
00707                 raise ValueError( "Selected node has {0:d} methods but no method given. Provide one of {1!s}".format(*(methods)) )
00708         else:
00709             for m in methods:
00710                 if ( m.nodeid.Identifier == args.method ):
00711                     method_id = m.nodeid
00712                     break
00713 
00714         if ( method_id is None):
00715             # last resort:
00716             method_id = ua.NodeId( identifier=args.method )#, namespaceidx=? )#, nodeidtype=?): )
00717 
00718         #print( "method_id=%s\nval=%s" % (method_id,val) )
00719 
00720         result_variants = node.call_method( method_id, *val )
00721         print( "resulting result_variants={0!s}".format(result_variants) )
00722     finally:
00723         client.disconnect()
00724     sys.exit(0)
00725     print(args)


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:23