test_external_server.py
Go to the documentation of this file.
1 """
2 Test an OPC-UA server with freeopcua python client
3 """
4 import logging
5 import sys
6 import unittest
7 from concurrent.futures import Future
8 from datetime import datetime
9 from datetime import timedelta
10 import time
11 
12 from opcua import ua
13 from opcua import Client
14 
15 
16 class MySubHandler():
17 
18  '''
19  More advanced subscription client using Future, so we can wait for events in tests
20  '''
21 
22  def __init__(self):
23  self.future = Future()
24 
25  def reset(self):
26  self.future = Future()
27 
28  def datachange_notification(self, node, val, data):
29  self.future.set_result((node, val, data))
30 
31  def event_notification(self, event):
32  self.future.set_result(event)
33 
34 
35 class MySubHandler2(object):
36  def __init__(self):
37  self.results = []
38 
39  def datachange_notification(self, node, val, data):
40  self.results.append((node, val))
41 
42  def event_notification(self, event):
43  self.results.append(event)
44 
45 
46 def connect(func):
47  def wrapper(self):
48  try:
49  client = Client(URL)
50  client.connect()
51  func(self, client)
52  finally:
53  client.disconnect()
54  return wrapper
55 
56 
57 class Tests(unittest.TestCase):
58 
60  c = Client(URL)
61  c.connect()
62  c.disconnect()
63 
65  c = Client(URL)
66  c.set_security_string("basic256,sign,XXXX")
67  c.connect()
68  c.disconnect()
69 
70  def test_find_servers(self):
71  c = Client(URL)
72  res = c.connect_and_find_servers()
73  self.assertTrue(len(res) > 0)
74 
76  c = Client(URL)
77  res = c.connect_and_get_server_endpoints()
78  self.assertTrue(len(res) > 0)
79 
80  @connect
81  def test_get_root(self, client):
82  root = client.get_root_node()
83  self.assertEqual(root.get_browse_name(), ua.QualifiedName("Root", 0))
84 
85  @connect
86  def test_get_root_children(self, client):
87  root = client.get_root_node()
88  childs = root.get_children()
89  self.assertTrue(len(childs) > 2)
90 
91  @connect
92  def test_get_namespace_array(self, client):
93  array = client.get_namespace_array()
94  self.assertTrue(len(array) > 0)
95 
96  @connect
97  def test_get_server_node(self, client):
98  srv = client.get_server_node()
99  self.assertEqual(srv.get_browse_name(), ua.QualifiedName("Server", 0))
100  #childs = srv.get_children()
101  #self.assertTrue(len(childs) > 4)
102 
103  @connect
104  def test_browsepathtonodeid(self, client):
105  root = client.get_root_node()
106  node = root.get_child(["0:Objects", "0:Server" , "0:ServerArray"])
107  self.assertEqual(node.get_browse_name(), ua.QualifiedName("ServerArray", 0))
108 
109  @connect
110  def test_subscribe_server_time(self, client):
111  msclt = MySubHandler()
112 
113  server_time_node = client.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
114  sub = client.create_subscription(200, msclt)
115  handle = sub.subscribe_data_change(server_time_node)
116 
117  node, val, data = msclt.future.result()
118  self.assertEqual(node, server_time_node)
119  delta = datetime.utcnow() - val
120  print("Timedelta is ", delta)
121  #self.assertTrue(delta < timedelta(seconds=2))
122 
123  sub.unsubscribe(handle)
124  sub.delete()
125 
126 
127 if __name__ == "__main__":
128 
129  logging.basicConfig(level=logging.WARN)
130  # FIXME add better arguments parsing with possibility to specify
131  # username and password and encryption
132  if len(sys.argv) < 2:
133  print("This script is meant to test compatibilty to a server with freeopcua python client library")
134  print("Usage: test_server.py url")
135  sys.exit(1)
136  else:
137  URL = sys.argv[1]
138 
139  unittest.main(verbosity=30, argv=sys.argv[:1])
140 
def test_subscribe_server_time(self, client)
def test_get_namespace_array(self, client)
def test_get_server_node(self, client)
def test_get_root_children(self, client)
def test_browsepathtonodeid(self, client)
def datachange_notification(self, node, val, data)
def func(parent, variant)
def datachange_notification(self, node, val, data)


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:44