tools.py
Go to the documentation of this file.
1 import logging
2 import sys
3 import argparse
4 from datetime import datetime, timedelta
5 import math
6 import time
7 
8 try:
9  from IPython import embed
10 except ImportError:
11  import code
12 
13  def embed():
14  code.interact(local=dict(globals(), **locals()))
15 
16 from opcua import ua
17 from opcua import Client
18 from opcua import Server
19 from opcua import Node
20 from opcua import uamethod
21 
22 
23 def add_minimum_args(parser):
24  parser.add_argument("-u",
25  "--url",
26  help="URL of OPC UA server (for example: opc.tcp://example.org:4840)",
27  default='opc.tcp://localhost:4840',
28  metavar="URL")
29  parser.add_argument("-v",
30  "--verbose",
31  dest="loglevel",
32  choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
33  default='WARNING',
34  help="Set log level")
35  parser.add_argument("--timeout",
36  dest="timeout",
37  type=int,
38  default=1,
39  help="Set socket timeout (NOT the diverse UA timeouts)")
40 
41 
42 def add_common_args(parser, default_node='i=84'):
43  add_minimum_args(parser)
44  parser.add_argument("-n",
45  "--nodeid",
46  help="Fully-qualified node ID (for example: i=85). Default: root node",
47  default=default_node,
48  metavar="NODE")
49  parser.add_argument("-p",
50  "--path",
51  help="Comma separated browse path to the node starting at NODE (for example: 3:Mybject,3:MyVariable)",
52  default='',
53  metavar="BROWSEPATH")
54  parser.add_argument("-i",
55  "--namespace",
56  help="Default namespace",
57  type=int,
58  default=0,
59  metavar="NAMESPACE")
60  parser.add_argument("--security",
61  help="Security settings, for example: Basic256,SignAndEncrypt,cert.der,pk.pem[,server_cert.der]. Default: None",
62  default='')
63 
64 
65 def _require_nodeid(parser, args):
66  # check that a nodeid has been given explicitly, a bit hackish...
67  if args.nodeid == "i=84" and args.path == "":
68  parser.print_usage()
69  print("{0}: error: A NodeId or BrowsePath is required".format(parser.prog))
70  sys.exit(1)
71 
72 
73 def parse_args(parser, requirenodeid=False):
74  args = parser.parse_args()
75  logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
76  if args.url and '://' not in args.url:
77  logging.info("Adding default scheme %s to URL %s", ua.OPC_TCP_SCHEME, args.url)
78  args.url = ua.OPC_TCP_SCHEME + '://' + args.url
79  if requirenodeid:
80  _require_nodeid(parser, args)
81  return args
82 
83 
84 def get_node(client, args):
85  node = client.get_node(args.nodeid)
86  if args.path:
87  path = args.path.split(",")
88  if node.nodeid == ua.NodeId(84, 0) and path[0] == "0:Root":
89  # let user specify root if not node given
90  path = path[1:]
91  node = node.get_child(path)
92  return node
93 
94 
95 def uaread():
96  parser = argparse.ArgumentParser(description="Read attribute of a node, per default reads value of a node")
97  add_common_args(parser)
98  parser.add_argument("-a",
99  "--attribute",
100  dest="attribute",
101  type=int,
102  default=ua.AttributeIds.Value,
103  help="Set attribute to read")
104  parser.add_argument("-t",
105  "--datatype",
106  dest="datatype",
107  default="python",
108  choices=['python', 'variant', 'datavalue'],
109  help="Data type to return")
110 
111  args = parse_args(parser, requirenodeid=True)
112 
113  client = Client(args.url, timeout=args.timeout)
114  client.set_security_string(args.security)
115  client.connect()
116  try:
117  node = get_node(client, args)
118  attr = node.get_attribute(args.attribute)
119  if args.datatype == "python":
120  print(attr.Value.Value)
121  elif args.datatype == "variant":
122  print(attr.Value)
123  else:
124  print(attr)
125  finally:
126  client.disconnect()
127  sys.exit(0)
128  print(args)
129 
130 
131 def _args_to_array(val, array):
132  if array == "guess":
133  if "," in val:
134  array = "true"
135  if array == "true":
136  val = val.split(",")
137  return val
138 
139 
140 def _arg_to_bool(val):
141  return val in ("true", "True")
142 
143 
144 def _arg_to_variant(val, array, ptype, varianttype=None):
145  val = _args_to_array(val, array)
146  if isinstance(val, list):
147  val = [ptype(i) for i in val]
148  else:
149  val = ptype(val)
150  if varianttype:
151  return ua.Variant(val, varianttype)
152  else:
153  return ua.Variant(val)
154 
155 
156 def _val_to_variant(val, args):
157  array = args.array
158  if args.datatype == "guess":
159  if val in ("true", "True", "false", "False"):
160  return _arg_to_variant(val, array, _arg_to_bool)
161  try:
162  return _arg_to_variant(val, array, int)
163  except ValueError:
164  try:
165  return _arg_to_variant(val, array, float)
166  except ValueError:
167  return _arg_to_variant(val, array, str)
168  elif args.datatype == "bool":
169  if val in ("1", "True", "true"):
170  return ua.Variant(True, ua.VariantType.Boolean)
171  else:
172  return ua.Variant(False, ua.VariantType.Boolean)
173  elif args.datatype == "sbyte":
174  return _arg_to_variant(val, array, int, ua.VariantType.SByte)
175  elif args.datatype == "byte":
176  return _arg_to_variant(val, array, int, ua.VariantType.Byte)
177  #elif args.datatype == "uint8":
178  #return _arg_to_variant(val, array, int, ua.VariantType.Byte)
179  elif args.datatype == "uint16":
180  return _arg_to_variant(val, array, int, ua.VariantType.UInt16)
181  elif args.datatype == "uint32":
182  return _arg_to_variant(val, array, int, ua.VariantType.UInt32)
183  elif args.datatype == "uint64":
184  return _arg_to_variant(val, array, int, ua.VariantType.UInt64)
185  #elif args.datatype == "int8":
186  #return ua.Variant(int(val), ua.VariantType.Int8)
187  elif args.datatype == "int16":
188  return _arg_to_variant(val, array, int, ua.VariantType.Int16)
189  elif args.datatype == "int32":
190  return _arg_to_variant(val, array, int, ua.VariantType.Int32)
191  elif args.datatype == "int64":
192  return _arg_to_variant(val, array, int, ua.VariantType.Int64)
193  elif args.datatype == "float":
194  return _arg_to_variant(val, array, float, ua.VariantType.Float)
195  elif args.datatype == "double":
196  return _arg_to_variant(val, array, float, ua.VariantType.Double)
197  elif args.datatype == "string":
198  return _arg_to_variant(val, array, str, ua.VariantType.String)
199  elif args.datatype == "datetime":
200  raise NotImplementedError
201  elif args.datatype == "Guid":
202  return _arg_to_variant(val, array, bytes, ua.VariantType.Guid)
203  elif args.datatype == "ByteString":
204  return _arg_to_variant(val, array, bytes, ua.VariantType.ByteString)
205  elif args.datatype == "xml":
206  return _arg_to_variant(val, array, str, ua.VariantType.XmlElement)
207  elif args.datatype == "nodeid":
208  return _arg_to_variant(val, array, ua.NodeId.from_string, ua.VariantType.NodeId)
209  elif args.datatype == "expandednodeid":
210  return _arg_to_variant(val, array, ua.ExpandedNodeId.from_string, ua.VariantType.ExpandedNodeId)
211  elif args.datatype == "statuscode":
212  return _arg_to_variant(val, array, int, ua.VariantType.StatusCode)
213  elif args.datatype in ("qualifiedname", "browsename"):
214  return _arg_to_variant(val, array, ua.QualifiedName.from_string, ua.VariantType.QualifiedName)
215  elif args.datatype == "LocalizedText":
216  return _arg_to_variant(val, array, ua.LocalizedText, ua.VariantType.LocalizedText)
217 
218 
219 def uawrite():
220  parser = argparse.ArgumentParser(description="Write attribute of a node, per default write value of node")
221  add_common_args(parser)
222  parser.add_argument("-a",
223  "--attribute",
224  dest="attribute",
225  type=int,
226  default=ua.AttributeIds.Value,
227  help="Set attribute to read")
228  parser.add_argument("-l",
229  "--list",
230  "--array",
231  dest="array",
232  default="guess",
233  choices=["guess", "true", "false"],
234  help="Value is an array")
235  parser.add_argument("-t",
236  "--datatype",
237  dest="datatype",
238  default="guess",
239  choices=["guess", 'byte', 'sbyte', 'nodeid', 'expandednodeid', 'qualifiedname', 'browsename', 'string', 'float', 'double', 'int16', 'int32', "int64", 'uint16', 'uint32', 'uint64', "bool", "string", 'datetime', 'bytestring', 'xmlelement', 'statuscode', 'localizedtext'],
240  help="Data type to return")
241  parser.add_argument("value",
242  help="Value to be written",
243  metavar="VALUE")
244  args = parse_args(parser, requirenodeid=True)
245 
246  client = Client(args.url, timeout=args.timeout)
247  client.set_security_string(args.security)
248  client.connect()
249  try:
250  node = get_node(client, args)
251  val = _val_to_variant(args.value, args)
252  node.set_attribute(args.attribute, ua.DataValue(val))
253  finally:
254  client.disconnect()
255  sys.exit(0)
256  print(args)
257 
258 
259 def uals():
260  parser = argparse.ArgumentParser(description="Browse OPC-UA node and print result")
261  add_common_args(parser)
262  parser.add_argument("-l",
263  dest="long_format",
264  const=3,
265  nargs="?",
266  type=int,
267  help="use a long listing format")
268  parser.add_argument("-d",
269  "--depth",
270  default=1,
271  type=int,
272  help="Browse depth")
273 
274  args = parse_args(parser)
275  if args.long_format is None:
276  args.long_format = 1
277 
278  client = Client(args.url, timeout=args.timeout)
279  client.set_security_string(args.security)
280  client.connect()
281  try:
282  node = get_node(client, args)
283  print("Browsing node {0} at {1}\n".format(node, args.url))
284  if args.long_format == 0:
285  _lsprint_0(node, args.depth - 1)
286  elif args.long_format == 1:
287  _lsprint_1(node, args.depth - 1)
288  else:
289  _lsprint_long(node, args.depth - 1)
290  finally:
291  client.disconnect()
292  sys.exit(0)
293  print(args)
294 
295 
296 def _lsprint_0(node, depth, indent=""):
297  if not indent:
298  print("{0:30} {1:25}".format("DisplayName", "NodeId"))
299  print("")
300  for desc in node.get_children_descriptions():
301  print("{0}{1:30} {2:25}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string()))
302  if depth:
303  _lsprint_0(Node(node.server, desc.NodeId), depth - 1, indent + " ")
304 
305 
306 def _lsprint_1(node, depth, indent=""):
307  if not indent:
308  print("{0:30} {1:25} {2:25} {3:25}".format("DisplayName", "NodeId", "BrowseName", "Value"))
309  print("")
310 
311  for desc in node.get_children_descriptions():
312  if desc.NodeClass == ua.NodeClass.Variable:
313  val = Node(node.server, desc.NodeId).get_value()
314  print("{0}{1:30} {2!s:25} {3!s:25}, {4!s:3}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string(), val))
315  else:
316  print("{0}{1:30} {2!s:25} {3!s:25}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string()))
317  if depth:
318  _lsprint_1(Node(node.server, desc.NodeId), depth - 1, indent + " ")
319 
320 
321 def _lsprint_long(pnode, depth, indent=""):
322  if not indent:
323  print("{0:30} {1:25} {2:25} {3:10} {4:30} {5:25}".format("DisplayName", "NodeId", "BrowseName", "DataType", "Timestamp", "Value"))
324  print("")
325  for node in pnode.get_children():
326  attrs = node.get_attributes([ua.AttributeIds.DisplayName,
327  ua.AttributeIds.BrowseName,
328  ua.AttributeIds.NodeClass,
329  ua.AttributeIds.WriteMask,
330  ua.AttributeIds.UserWriteMask,
331  ua.AttributeIds.DataType,
332  ua.AttributeIds.Value])
333  name, bname, nclass, mask, umask, dtype, val = [attr.Value.Value for attr in attrs]
334  update = attrs[-1].ServerTimestamp
335  if nclass == ua.NodeClass.Variable:
336  print("{0}{1:30} {2:25} {3:25} {4:10} {5!s:30} {6!s:25}".format(indent, name.to_string(), node.nodeid.to_string(), bname.to_string(), dtype.to_string(), update, val))
337  else:
338  print("{0}{1:30} {2:25} {3:25}".format(indent, name.to_string(), bname.to_string(), node.nodeid.to_string()))
339  if depth:
340  _lsprint_long(node, depth - 1, indent + " ")
341 
342 
343 class SubHandler(object):
344 
345  def datachange_notification(self, node, val, data):
346  print("New data change event", node, val, data)
347 
348  def event_notification(self, event):
349  print("New event", event)
350 
351 
353  parser = argparse.ArgumentParser(description="Subscribe to a node and print results")
354  add_common_args(parser)
355  parser.add_argument("-t",
356  "--eventtype",
357  dest="eventtype",
358  default="datachange",
359  choices=['datachange', 'event'],
360  help="Event type to subscribe to")
361 
362  args = parse_args(parser, requirenodeid=False)
363  if args.eventtype == "datachange":
364  _require_nodeid(parser, args)
365  else:
366  # FIXME: this is broken, someone may have written i=84 on purpose
367  if args.nodeid == "i=84" and args.path == "":
368  args.nodeid = "i=2253"
369 
370  client = Client(args.url, timeout=args.timeout)
371  client.set_security_string(args.security)
372  client.connect()
373  try:
374  node = get_node(client, args)
375  handler = SubHandler()
376  sub = client.create_subscription(500, handler)
377  if args.eventtype == "datachange":
378  sub.subscribe_data_change(node)
379  else:
380  sub.subscribe_events(node)
381  print("Type Ctr-C to exit")
382  while True:
383  time.sleep(1)
384  finally:
385  client.disconnect()
386  sys.exit(0)
387  print(args)
388 
389 
391  result = []
392  result.append(('Application URI', app.ApplicationUri))
393  optionals = [
394  ('Product URI', app.ProductUri),
395  ('Application Name', app.ApplicationName.to_string()),
396  ('Application Type', str(app.ApplicationType)),
397  ('Gateway Server URI', app.GatewayServerUri),
398  ('Discovery Profile URI', app.DiscoveryProfileUri),
399  ]
400  for (n, v) in optionals:
401  if v:
402  result.append((n, v))
403  for url in app.DiscoveryUrls:
404  result.append(('Discovery URL', url))
405  return result # ['{}: {}'.format(n, v) for (n, v) in result]
406 
407 
408 def cert_to_string(der):
409  if not der:
410  return '[no certificate]'
411  try:
412  from opcua.crypto import uacrypto
413  except ImportError:
414  return "{0} bytes".format(len(der))
415  cert = uacrypto.x509_from_der(der)
416  return uacrypto.x509_to_string(cert)
417 
418 
420  result = [('Endpoint URL', ep.EndpointUrl)]
421  result += application_to_strings(ep.Server)
422  result += [
423  ('Server Certificate', cert_to_string(ep.ServerCertificate)),
424  ('Security Mode', str(ep.SecurityMode)),
425  ('Security Policy URI', ep.SecurityPolicyUri)]
426  for tok in ep.UserIdentityTokens:
427  result += [
428  ('User policy', tok.PolicyId),
429  (' Token type', str(tok.TokenType))]
430  if tok.IssuedTokenType or tok.IssuerEndpointUrl:
431  result += [
432  (' Issued Token type', tok.IssuedTokenType),
433  (' Issuer Endpoint URL', tok.IssuerEndpointUrl)]
434  if tok.SecurityPolicyUri:
435  result.append((' Security Policy URI', tok.SecurityPolicyUri))
436  result += [
437  ('Transport Profile URI', ep.TransportProfileUri),
438  ('Security Level', ep.SecurityLevel)]
439  return result
440 
441 
442 def uaclient():
443  parser = argparse.ArgumentParser(description="Connect to server and start python shell. root and objects nodes are available. Node specificed in command line is available as mynode variable")
444  add_common_args(parser)
445  parser.add_argument("-c",
446  "--certificate",
447  help="set client certificate")
448  parser.add_argument("-k",
449  "--private_key",
450  help="set client private key")
451  args = parse_args(parser)
452 
453  client = Client(args.url, timeout=args.timeout)
454  client.set_security_string(args.security)
455  if args.certificate:
456  client.load_client_certificate(args.certificate)
457  if args.private_key:
458  client.load_private_key(args.private_key)
459  client.connect()
460  try:
461  root = client.get_root_node()
462  objects = client.get_objects_node()
463  mynode = get_node(client, args)
464  embed()
465  finally:
466  client.disconnect()
467  sys.exit(0)
468 
469 
470 def uaserver():
471  parser = argparse.ArgumentParser(description="Run an example OPC-UA server. By importing xml definition and using uawrite command line, it is even possible to expose real data using this server")
472  # we setup a server, this is a bit different from other tool so we do not reuse common arguments
473  parser.add_argument("-u",
474  "--url",
475  help="URL of OPC UA server, default is opc.tcp://0.0.0.0:4840",
476  default='opc.tcp://0.0.0.0:4840',
477  metavar="URL")
478  parser.add_argument("-v",
479  "--verbose",
480  dest="loglevel",
481  choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
482  default='WARNING',
483  help="Set log level")
484  parser.add_argument("-x",
485  "--xml",
486  metavar="XML_FILE",
487  help="Populate address space with nodes defined in XML")
488  parser.add_argument("-p",
489  "--populate",
490  action="store_true",
491  help="Populate address space with some sample nodes")
492  parser.add_argument("-c",
493  "--disable-clock",
494  action="store_true",
495  help="Disable clock, to avoid seeing many write if debugging an application")
496  parser.add_argument("-s",
497  "--shell",
498  action="store_true",
499  help="Start python shell instead of randomly changing node values")
500  parser.add_argument("--certificate",
501  help="set server certificate")
502  parser.add_argument("--private_key",
503  help="set server private key")
504  args = parser.parse_args()
505  logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
506 
507  server = Server()
508  server.set_endpoint(args.url)
509  if args.certificate:
510  server.load_certificate(args.certificate)
511  if args.private_key:
512  server.load_private_key(args.private_key)
513  server.disable_clock(args.disable_clock)
514  server.set_server_name("FreeOpcUa Example Server")
515  if args.xml:
516  server.import_xml(args.xml)
517  if args.populate:
518  @uamethod
519  def multiply(parent, x, y):
520  print("multiply method call with parameters: ", x, y)
521  return x * y
522 
523  uri = "http://examples.freeopcua.github.io"
524  idx = server.register_namespace(uri)
525  objects = server.get_objects_node()
526  myobj = objects.add_object(idx, "MyObject")
527  mywritablevar = myobj.add_variable(idx, "MyWritableVariable", 6.7)
528  mywritablevar.set_writable() # Set MyVariable to be writable by clients
529  myvar = myobj.add_variable(idx, "MyVariable", 6.7)
530  myarrayvar = myobj.add_variable(idx, "MyVarArray", [6.7, 7.9])
531  myprop = myobj.add_property(idx, "MyProperty", "I am a property")
532  mymethod = myobj.add_method(idx, "MyMethod", multiply, [ua.VariantType.Double, ua.VariantType.Int64], [ua.VariantType.Double])
533 
534  server.start()
535  try:
536  if args.shell:
537  embed()
538  elif args.populate:
539  count = 0
540  while True:
541  time.sleep(1)
542  myvar.set_value(math.sin(count / 10))
543  myarrayvar.set_value([math.sin(count / 10), math.sin(count / 100)])
544  count += 1
545  else:
546  while True:
547  time.sleep(1)
548  finally:
549  server.stop()
550  sys.exit(0)
551 
552 
554  parser = argparse.ArgumentParser(description="Performs OPC UA discovery and prints information on servers and endpoints.")
555  add_minimum_args(parser)
556  parser.add_argument("-n",
557  "--network",
558  action="store_true",
559  help="Also send a FindServersOnNetwork request to server")
560  #parser.add_argument("-s",
561  #"--servers",
562  #action="store_false",
563  #help="send a FindServers request to server")
564  #parser.add_argument("-e",
565  #"--endpoints",
566  #action="store_false",
567  #help="send a GetEndpoints request to server")
568  args = parse_args(parser)
569 
570  client = Client(args.url, timeout=args.timeout)
571 
572  if args.network:
573  print("Performing discovery at {0}\n".format(args.url))
574  for i, server in enumerate(client.connect_and_find_servers_on_network(), start=1):
575  print('Server {0}:'.format(i))
576  #for (n, v) in application_to_strings(server):
577  #print(' {}: {}'.format(n, v))
578  print('')
579 
580  print("Performing discovery at {0}\n".format(args.url))
581  for i, server in enumerate(client.connect_and_find_servers(), start=1):
582  print('Server {0}:'.format(i))
583  for (n, v) in application_to_strings(server):
584  print(' {0}: {1}'.format(n, v))
585  print('')
586 
587  for i, ep in enumerate(client.connect_and_get_server_endpoints(), start=1):
588  print('Endpoint {0}:'.format(i))
589  for (n, v) in endpoint_to_strings(ep):
590  print(' {0}: {1}'.format(n, v))
591  print('')
592 
593  sys.exit(0)
594 
595 
597  if isinstance(o, ua.HistoryData):
598  print("{0:30} {1:10} {2}".format('Source timestamp', 'Status', 'Value'))
599  for d in o.DataValues:
600  print("{0:30} {1:10} {2}".format(str(d.SourceTimestamp), d.StatusCode.name, d.Value))
601 
602 
603 def str_to_datetime(s, default=None):
604  if not s:
605  if default is not None:
606  return default
607  return datetime.utcnow()
608  # FIXME: try different datetime formats
609  for fmt in ["%Y-%m-%d", "%Y-%m-%d %H:%M", "%Y-%m-%d %H:%M:%S"]:
610  try:
611  return datetime.strptime(s, fmt)
612  except ValueError:
613  pass
614 
615 
617  parser = argparse.ArgumentParser(description="Read history of a node")
618  add_common_args(parser)
619  parser.add_argument("--starttime",
620  default=None,
621  help="Start time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time - one day")
622  parser.add_argument("--endtime",
623  default=None,
624  help="End time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time")
625  parser.add_argument("-e",
626  "--events",
627  action="store_true",
628  help="Read event history instead of data change history")
629  parser.add_argument("-l",
630  "--limit",
631  type=int,
632  default=10,
633  help="Maximum number of notfication to return")
634 
635  args = parse_args(parser, requirenodeid=True)
636 
637  client = Client(args.url, timeout=args.timeout)
638  client.set_security_string(args.security)
639  client.connect()
640  try:
641  node = get_node(client, args)
642  starttime = str_to_datetime(args.starttime, datetime.utcnow() - timedelta(days=1))
643  endtime = str_to_datetime(args.endtime, datetime.utcnow())
644  print("Reading raw history of node {0} at {1}; start at {2}, end at {3}\n".format(node, args.url, starttime, endtime))
645  if args.events:
646  evs = node.read_event_history(starttime, endtime, numvalues=args.limit)
647  for ev in evs:
648  print(ev)
649  else:
650  print_history(node.read_raw_history(starttime, endtime, numvalues=args.limit))
651  finally:
652  client.disconnect()
653  sys.exit(0)
654 
655 
656 def uacall():
657  parser = argparse.ArgumentParser(description="Call method of a node")
658  add_common_args(parser)
659  parser.add_argument("-m",
660  "--method",
661  dest="method",
662  type=int,
663  default=None,
664  help="Set method to call. If not given then (single) method of the selected node is used.")
665  parser.add_argument("-l",
666  "--list",
667  "--array",
668  dest="array",
669  default="guess",
670  choices=["guess", "true", "false"],
671  help="Value is an array")
672  parser.add_argument("-t",
673  "--datatype",
674  dest="datatype",
675  default="guess",
676  choices=["guess", 'byte', 'sbyte', 'nodeid', 'expandednodeid', 'qualifiedname', 'browsename', 'string', 'float', 'double', 'int16', 'int32', "int64", 'uint16', 'uint32', 'uint64', "bool", "string", 'datetime', 'bytestring', 'xmlelement', 'statuscode', 'localizedtext'],
677  help="Data type to return")
678  parser.add_argument("value",
679  help="Value to use for call to method, if any",
680  nargs="?",
681  metavar="VALUE")
682 
683  args = parse_args(parser, requirenodeid=True)
684 
685  client = Client(args.url, timeout=args.timeout)
686  client.set_security_string(args.security)
687  client.connect()
688  try:
689  node = get_node(client, args)
690  # val must be a tuple in order to enable method calls without arguments
691  if ( args.value is None ):
692  val = () #empty tuple
693  else:
694  val = (_val_to_variant(args.value, args),) # tuple with one element
695 
696  # determine method to call: Either explicitly given or automatically select the method of the selected node.
697  methods = node.get_methods()
698  method_id = None
699  #print( "methods=%s" % (methods) )
700 
701  if ( args.method is None ):
702  if ( len( methods ) == 0 ):
703  raise ValueError( "No methods in selected node and no method given" )
704  elif ( len( methods ) == 1 ):
705  method_id = methods[0]
706  else:
707  raise ValueError( "Selected node has {0:d} methods but no method given. Provide one of {1!s}".format(*(methods)) )
708  else:
709  for m in methods:
710  if ( m.nodeid.Identifier == args.method ):
711  method_id = m.nodeid
712  break
713 
714  if ( method_id is None):
715  # last resort:
716  method_id = ua.NodeId( identifier=args.method )#, namespaceidx=? )#, nodeidtype=?): )
717 
718  #print( "method_id=%s\nval=%s" % (method_id,val) )
719 
720  result_variants = node.call_method( method_id, *val )
721  print( "resulting result_variants={0!s}".format(result_variants) )
722  finally:
723  client.disconnect()
724  sys.exit(0)
725  print(args)
def uaread()
Definition: tools.py:95
def add_minimum_args(parser)
Definition: tools.py:23
def _arg_to_variant(val, array, ptype, varianttype=None)
Definition: tools.py:144
def endpoint_to_strings(ep)
Definition: tools.py:419
def uahistoryread()
Definition: tools.py:616
def _lsprint_0(node, depth, indent="")
Definition: tools.py:296
def _require_nodeid(parser, args)
Definition: tools.py:65
def uawrite()
Definition: tools.py:219
def _lsprint_1(node, depth, indent="")
Definition: tools.py:306
def uacall()
Definition: tools.py:656
def get_node(client, args)
Definition: tools.py:84
def datachange_notification(self, node, val, data)
Definition: tools.py:345
def _lsprint_long(pnode, depth, indent="")
Definition: tools.py:321
def multiply(parent, x, y)
def _args_to_array(val, array)
Definition: tools.py:131
def print_history(o)
Definition: tools.py:596
def cert_to_string(der)
Definition: tools.py:408
def uasubscribe()
Definition: tools.py:352
def str_to_datetime(s, default=None)
Definition: tools.py:603
def event_notification(self, event)
Definition: tools.py:348
def uaclient()
Definition: tools.py:442
def uadiscover()
Definition: tools.py:553
def application_to_strings(app)
Definition: tools.py:390
def _arg_to_bool(val)
Definition: tools.py:140
def embed()
Definition: tools.py:13
def add_common_args(parser, default_node='i=84')
Definition: tools.py:42
def uaserver()
Definition: tools.py:470
def uals()
Definition: tools.py:259
def parse_args(parser, requirenodeid=False)
Definition: tools.py:73
def _val_to_variant(val, args)
Definition: tools.py:156


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:44