4 from datetime
import datetime, timedelta
9 from IPython
import embed
14 code.interact(local=dict(globals(), **locals()))
17 from opcua
import Client
18 from opcua
import Server
19 from opcua
import Node
20 from opcua
import uamethod
24 parser.add_argument(
"-u",
26 help=
"URL of OPC UA server (for example: opc.tcp://example.org:4840)",
27 default=
'opc.tcp://localhost:4840',
29 parser.add_argument(
"-v",
32 choices=[
'DEBUG',
'INFO',
'WARNING',
'ERROR',
'CRITICAL'],
35 parser.add_argument(
"--timeout",
39 help=
"Set socket timeout (NOT the diverse UA timeouts)")
44 parser.add_argument(
"-n",
46 help=
"Fully-qualified node ID (for example: i=85). Default: root node",
49 parser.add_argument(
"-p",
51 help=
"Comma separated browse path to the node starting at NODE (for example: 3:Mybject,3:MyVariable)",
54 parser.add_argument(
"-i",
56 help=
"Default namespace",
60 parser.add_argument(
"--security",
61 help=
"Security settings, for example: Basic256,SignAndEncrypt,cert.der,pk.pem[,server_cert.der]. Default: None",
67 if args.nodeid ==
"i=84" and args.path ==
"":
69 print(
"{0}: error: A NodeId or BrowsePath is required".format(parser.prog))
74 args = parser.parse_args()
75 logging.basicConfig(format=
"%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
76 if args.url
and '://' not in args.url:
77 logging.info(
"Adding default scheme %s to URL %s", ua.OPC_TCP_SCHEME, args.url)
78 args.url = ua.OPC_TCP_SCHEME +
'://' + args.url
85 node = client.get_node(args.nodeid)
87 path = args.path.split(
",")
88 if node.nodeid ==
ua.NodeId(84, 0)
and path[0] ==
"0:Root":
91 node = node.get_child(path)
96 parser = argparse.ArgumentParser(description=
"Read attribute of a node, per default reads value of a node")
98 parser.add_argument(
"-a",
102 default=ua.AttributeIds.Value,
103 help=
"Set attribute to read")
104 parser.add_argument(
"-t",
108 choices=[
'python',
'variant',
'datavalue'],
109 help=
"Data type to return")
113 client = Client(args.url, timeout=args.timeout)
114 client.set_security_string(args.security)
118 attr = node.get_attribute(args.attribute)
119 if args.datatype ==
"python":
120 print(attr.Value.Value)
121 elif args.datatype ==
"variant":
141 return val
in (
"true",
"True")
146 if isinstance(val, list):
147 val = [ptype(i)
for i
in val]
158 if args.datatype ==
"guess":
159 if val
in (
"true",
"True",
"false",
"False"):
168 elif args.datatype ==
"bool":
169 if val
in (
"1",
"True",
"true"):
170 return ua.Variant(
True, ua.VariantType.Boolean)
172 return ua.Variant(
False, ua.VariantType.Boolean)
173 elif args.datatype ==
"sbyte":
175 elif args.datatype ==
"byte":
179 elif args.datatype ==
"uint16":
181 elif args.datatype ==
"uint32":
183 elif args.datatype ==
"uint64":
187 elif args.datatype ==
"int16":
189 elif args.datatype ==
"int32":
191 elif args.datatype ==
"int64":
193 elif args.datatype ==
"float":
195 elif args.datatype ==
"double":
197 elif args.datatype ==
"string":
199 elif args.datatype ==
"datetime":
200 raise NotImplementedError
201 elif args.datatype ==
"Guid":
203 elif args.datatype ==
"ByteString":
205 elif args.datatype ==
"xml":
207 elif args.datatype ==
"nodeid":
208 return _arg_to_variant(val, array, ua.NodeId.from_string, ua.VariantType.NodeId)
209 elif args.datatype ==
"expandednodeid":
210 return _arg_to_variant(val, array, ua.ExpandedNodeId.from_string, ua.VariantType.ExpandedNodeId)
211 elif args.datatype ==
"statuscode":
213 elif args.datatype
in (
"qualifiedname",
"browsename"):
214 return _arg_to_variant(val, array, ua.QualifiedName.from_string, ua.VariantType.QualifiedName)
215 elif args.datatype ==
"LocalizedText":
220 parser = argparse.ArgumentParser(description=
"Write attribute of a node, per default write value of node")
222 parser.add_argument(
"-a",
226 default=ua.AttributeIds.Value,
227 help=
"Set attribute to read")
228 parser.add_argument(
"-l",
233 choices=[
"guess",
"true",
"false"],
234 help=
"Value is an array")
235 parser.add_argument(
"-t",
239 choices=[
"guess",
'byte',
'sbyte',
'nodeid',
'expandednodeid',
'qualifiedname',
'browsename',
'string',
'float',
'double',
'int16',
'int32',
"int64",
'uint16',
'uint32',
'uint64',
"bool",
"string",
'datetime',
'bytestring',
'xmlelement',
'statuscode',
'localizedtext'],
240 help=
"Data type to return")
241 parser.add_argument(
"value",
242 help=
"Value to be written",
246 client = Client(args.url, timeout=args.timeout)
247 client.set_security_string(args.security)
260 parser = argparse.ArgumentParser(description=
"Browse OPC-UA node and print result")
262 parser.add_argument(
"-l",
267 help=
"use a long listing format")
268 parser.add_argument(
"-d",
275 if args.long_format
is None:
278 client = Client(args.url, timeout=args.timeout)
279 client.set_security_string(args.security)
283 print(
"Browsing node {0} at {1}\n".format(node, args.url))
284 if args.long_format == 0:
286 elif args.long_format == 1:
298 print(
"{0:30} {1:25}".format(
"DisplayName",
"NodeId"))
300 for desc
in node.get_children_descriptions():
301 print(
"{0}{1:30} {2:25}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string()))
303 _lsprint_0(Node(node.server, desc.NodeId), depth - 1, indent +
" ")
308 print(
"{0:30} {1:25} {2:25} {3:25}".format(
"DisplayName",
"NodeId",
"BrowseName",
"Value"))
311 for desc
in node.get_children_descriptions():
312 if desc.NodeClass == ua.NodeClass.Variable:
313 val = Node(node.server, desc.NodeId).get_value()
314 print(
"{0}{1:30} {2!s:25} {3!s:25}, {4!s:3}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string(), val))
316 print(
"{0}{1:30} {2!s:25} {3!s:25}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string()))
318 _lsprint_1(Node(node.server, desc.NodeId), depth - 1, indent +
" ")
323 print(
"{0:30} {1:25} {2:25} {3:10} {4:30} {5:25}".format(
"DisplayName",
"NodeId",
"BrowseName",
"DataType",
"Timestamp",
"Value"))
325 for node
in pnode.get_children():
326 attrs = node.get_attributes([ua.AttributeIds.DisplayName,
327 ua.AttributeIds.BrowseName,
328 ua.AttributeIds.NodeClass,
329 ua.AttributeIds.WriteMask,
330 ua.AttributeIds.UserWriteMask,
331 ua.AttributeIds.DataType,
332 ua.AttributeIds.Value])
333 name, bname, nclass, mask, umask, dtype, val = [attr.Value.Value
for attr
in attrs]
334 update = attrs[-1].ServerTimestamp
335 if nclass == ua.NodeClass.Variable:
336 print(
"{0}{1:30} {2:25} {3:25} {4:10} {5!s:30} {6!s:25}".format(indent, name.to_string(), node.nodeid.to_string(), bname.to_string(), dtype.to_string(), update, val))
338 print(
"{0}{1:30} {2:25} {3:25}".format(indent, name.to_string(), bname.to_string(), node.nodeid.to_string()))
346 print(
"New data change event", node, val, data)
349 print(
"New event", event)
353 parser = argparse.ArgumentParser(description=
"Subscribe to a node and print results")
355 parser.add_argument(
"-t",
358 default=
"datachange",
359 choices=[
'datachange',
'event'],
360 help=
"Event type to subscribe to")
362 args =
parse_args(parser, requirenodeid=
False)
363 if args.eventtype ==
"datachange":
367 if args.nodeid ==
"i=84" and args.path ==
"":
368 args.nodeid =
"i=2253" 370 client = Client(args.url, timeout=args.timeout)
371 client.set_security_string(args.security)
376 sub = client.create_subscription(500, handler)
377 if args.eventtype ==
"datachange":
378 sub.subscribe_data_change(node)
380 sub.subscribe_events(node)
381 print(
"Type Ctr-C to exit")
392 result.append((
'Application URI', app.ApplicationUri))
394 (
'Product URI', app.ProductUri),
395 (
'Application Name', app.ApplicationName.to_string()),
396 (
'Application Type', str(app.ApplicationType)),
397 (
'Gateway Server URI', app.GatewayServerUri),
398 (
'Discovery Profile URI', app.DiscoveryProfileUri),
400 for (n, v)
in optionals:
402 result.append((n, v))
403 for url
in app.DiscoveryUrls:
404 result.append((
'Discovery URL', url))
410 return '[no certificate]' 414 return "{0} bytes".format(len(der))
415 cert = uacrypto.x509_from_der(der)
416 return uacrypto.x509_to_string(cert)
420 result = [(
'Endpoint URL', ep.EndpointUrl)]
424 (
'Security Mode', str(ep.SecurityMode)),
425 (
'Security Policy URI', ep.SecurityPolicyUri)]
426 for tok
in ep.UserIdentityTokens:
428 (
'User policy', tok.PolicyId),
429 (
' Token type', str(tok.TokenType))]
430 if tok.IssuedTokenType
or tok.IssuerEndpointUrl:
432 (
' Issued Token type', tok.IssuedTokenType),
433 (
' Issuer Endpoint URL', tok.IssuerEndpointUrl)]
434 if tok.SecurityPolicyUri:
435 result.append((
' Security Policy URI', tok.SecurityPolicyUri))
437 (
'Transport Profile URI', ep.TransportProfileUri),
438 (
'Security Level', ep.SecurityLevel)]
443 parser = argparse.ArgumentParser(description=
"Connect to server and start python shell. root and objects nodes are available. Node specificed in command line is available as mynode variable")
445 parser.add_argument(
"-c",
447 help=
"set client certificate")
448 parser.add_argument(
"-k",
450 help=
"set client private key")
453 client = Client(args.url, timeout=args.timeout)
454 client.set_security_string(args.security)
456 client.load_client_certificate(args.certificate)
458 client.load_private_key(args.private_key)
461 root = client.get_root_node()
462 objects = client.get_objects_node()
471 parser = argparse.ArgumentParser(description=
"Run an example OPC-UA server. By importing xml definition and using uawrite command line, it is even possible to expose real data using this server")
473 parser.add_argument(
"-u",
475 help=
"URL of OPC UA server, default is opc.tcp://0.0.0.0:4840",
476 default=
'opc.tcp://0.0.0.0:4840',
478 parser.add_argument(
"-v",
481 choices=[
'DEBUG',
'INFO',
'WARNING',
'ERROR',
'CRITICAL'],
483 help=
"Set log level")
484 parser.add_argument(
"-x",
487 help=
"Populate address space with nodes defined in XML")
488 parser.add_argument(
"-p",
491 help=
"Populate address space with some sample nodes")
492 parser.add_argument(
"-c",
495 help=
"Disable clock, to avoid seeing many write if debugging an application")
496 parser.add_argument(
"-s",
499 help=
"Start python shell instead of randomly changing node values")
500 parser.add_argument(
"--certificate",
501 help=
"set server certificate")
502 parser.add_argument(
"--private_key",
503 help=
"set server private key")
504 args = parser.parse_args()
505 logging.basicConfig(format=
"%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
508 server.set_endpoint(args.url)
510 server.load_certificate(args.certificate)
512 server.load_private_key(args.private_key)
513 server.disable_clock(args.disable_clock)
514 server.set_server_name(
"FreeOpcUa Example Server")
516 server.import_xml(args.xml)
520 print(
"multiply method call with parameters: ", x, y)
523 uri =
"http://examples.freeopcua.github.io" 524 idx = server.register_namespace(uri)
525 objects = server.get_objects_node()
526 myobj = objects.add_object(idx,
"MyObject")
527 mywritablevar = myobj.add_variable(idx,
"MyWritableVariable", 6.7)
528 mywritablevar.set_writable()
529 myvar = myobj.add_variable(idx,
"MyVariable", 6.7)
530 myarrayvar = myobj.add_variable(idx,
"MyVarArray", [6.7, 7.9])
531 myprop = myobj.add_property(idx,
"MyProperty",
"I am a property")
532 mymethod = myobj.add_method(idx,
"MyMethod", multiply, [ua.VariantType.Double, ua.VariantType.Int64], [ua.VariantType.Double])
542 myvar.set_value(math.sin(count / 10))
543 myarrayvar.set_value([math.sin(count / 10), math.sin(count / 100)])
554 parser = argparse.ArgumentParser(description=
"Performs OPC UA discovery and prints information on servers and endpoints.")
556 parser.add_argument(
"-n",
559 help=
"Also send a FindServersOnNetwork request to server")
570 client = Client(args.url, timeout=args.timeout)
573 print(
"Performing discovery at {0}\n".format(args.url))
574 for i, server
in enumerate(client.connect_and_find_servers_on_network(), start=1):
575 print(
'Server {0}:'.format(i))
580 print(
"Performing discovery at {0}\n".format(args.url))
581 for i, server
in enumerate(client.connect_and_find_servers(), start=1):
582 print(
'Server {0}:'.format(i))
584 print(
' {0}: {1}'.format(n, v))
587 for i, ep
in enumerate(client.connect_and_get_server_endpoints(), start=1):
588 print(
'Endpoint {0}:'.format(i))
590 print(
' {0}: {1}'.format(n, v))
598 print(
"{0:30} {1:10} {2}".format(
'Source timestamp',
'Status',
'Value'))
599 for d
in o.DataValues:
600 print(
"{0:30} {1:10} {2}".format(str(d.SourceTimestamp), d.StatusCode.name, d.Value))
605 if default
is not None:
607 return datetime.utcnow()
609 for fmt
in [
"%Y-%m-%d",
"%Y-%m-%d %H:%M",
"%Y-%m-%d %H:%M:%S"]:
611 return datetime.strptime(s, fmt)
617 parser = argparse.ArgumentParser(description=
"Read history of a node")
619 parser.add_argument(
"--starttime",
621 help=
"Start time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time - one day")
622 parser.add_argument(
"--endtime",
624 help=
"End time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time")
625 parser.add_argument(
"-e",
628 help=
"Read event history instead of data change history")
629 parser.add_argument(
"-l",
633 help=
"Maximum number of notfication to return")
637 client = Client(args.url, timeout=args.timeout)
638 client.set_security_string(args.security)
642 starttime =
str_to_datetime(args.starttime, datetime.utcnow() - timedelta(days=1))
644 print(
"Reading raw history of node {0} at {1}; start at {2}, end at {3}\n".format(node, args.url, starttime, endtime))
646 evs = node.read_event_history(starttime, endtime, numvalues=args.limit)
650 print_history(node.read_raw_history(starttime, endtime, numvalues=args.limit))
657 parser = argparse.ArgumentParser(description=
"Call method of a node")
659 parser.add_argument(
"-m",
664 help=
"Set method to call. If not given then (single) method of the selected node is used.")
665 parser.add_argument(
"-l",
670 choices=[
"guess",
"true",
"false"],
671 help=
"Value is an array")
672 parser.add_argument(
"-t",
676 choices=[
"guess",
'byte',
'sbyte',
'nodeid',
'expandednodeid',
'qualifiedname',
'browsename',
'string',
'float',
'double',
'int16',
'int32',
"int64",
'uint16',
'uint32',
'uint64',
"bool",
"string",
'datetime',
'bytestring',
'xmlelement',
'statuscode',
'localizedtext'],
677 help=
"Data type to return")
678 parser.add_argument(
"value",
679 help=
"Value to use for call to method, if any",
685 client = Client(args.url, timeout=args.timeout)
686 client.set_security_string(args.security)
691 if ( args.value
is None ):
697 methods = node.get_methods()
701 if ( args.method
is None ):
702 if ( len( methods ) == 0 ):
703 raise ValueError(
"No methods in selected node and no method given" )
704 elif ( len( methods ) == 1 ):
705 method_id = methods[0]
707 raise ValueError(
"Selected node has {0:d} methods but no method given. Provide one of {1!s}".format(*(methods)) )
710 if ( m.nodeid.Identifier == args.method ):
714 if ( method_id
is None):
716 method_id =
ua.NodeId( identifier=args.method )
720 result_variants = node.call_method( method_id, *val )
721 print(
"resulting result_variants={0!s}".format(result_variants) )
def multiply(parent, x, y)