tests_client.py
Go to the documentation of this file.
1 import unittest
2 
3 from opcua import Client
4 from opcua import Server
5 from opcua import ua
6 
7 from tests_subscriptions import SubscriptionTests
8 from tests_common import CommonTests, add_server_methods
9 from tests_xml import XmlTests
10 
11 port_num1 = 48510
12 
13 
14 class TestClient(unittest.TestCase, CommonTests, SubscriptionTests):
15 
16  '''
17  Run common tests on client side
18  Of course we need a server so we start also start a server
19  Tests that can only be run on client side must be defined in this class
20  '''
21  @classmethod
22  def setUpClass(cls):
23  # start our own server
24  cls.srv = Server()
25  cls.srv.set_endpoint('opc.tcp://localhost:{0:d}'.format(port_num1))
27  cls.srv.start()
28 
29  # start admin client
30  # long timeout since travis (automated testing) can be really slow
31  cls.clt = Client('opc.tcp://admin@localhost:{0:d}'.format(port_num1), timeout=10)
32  cls.clt.connect()
33  cls.opc = cls.clt
34 
35  # start anonymous client
36  cls.ro_clt = Client('opc.tcp://localhost:{0:d}'.format(port_num1))
37  cls.ro_clt.connect()
38 
39  @classmethod
40  def tearDownClass(cls):
41  #stop our clients
42  cls.ro_clt.disconnect()
43  cls.clt.disconnect()
44  # stop the server
45  cls.srv.stop()
46 
47  def test_service_fault(self):
48  request = ua.ReadRequest()
49  request.TypeId = ua.FourByteNodeId(999) # bad type!
50  with self.assertRaises(ua.UaStatusCodeError):
51  self.clt.uaclient._uasocket.send_request(request)
52 
54  objects = self.ro_clt.get_objects_node()
55  with self.assertRaises(ua.UaStatusCodeError):
56  objects.set_attribute(ua.AttributeIds.WriteMask, ua.DataValue(999))
57  with self.assertRaises(ua.UaStatusCodeError):
58  f = objects.add_folder(3, 'MyFolder')
59 
61  objects = self.clt.get_objects_node()
62  f = objects.add_folder(3, 'MyFolderRO')
63  f_ro = self.ro_clt.get_node(f.nodeid)
64  self.assertEqual(f, f_ro)
65  with self.assertRaises(ua.UaStatusCodeError):
66  f2 = f_ro.add_folder(3, 'MyFolder2')
67 
69  objects = self.clt.get_objects_node()
70  v = objects.add_variable(3, 'MyROVariable', 6)
71  v.set_value(4) #this should work
72  v_ro = self.ro_clt.get_node(v.nodeid)
73  with self.assertRaises(ua.UaStatusCodeError):
74  v_ro.set_value(2)
75  self.assertEqual(v_ro.get_value(), 4)
76  v.set_writable(True)
77  v_ro.set_value(2) #now it should work
78  self.assertEqual(v_ro.get_value(), 2)
79  v.set_writable(False)
80  with self.assertRaises(ua.UaStatusCodeError):
81  v_ro.set_value(9)
82  self.assertEqual(v_ro.get_value(), 2)
83 
85  """ Context manager calls connect() and disconnect()
86  """
87  state = [0]
88  def increment_state(self, *args, **kwargs):
89  state[0] += 1
90 
91  # create client and replace instance methods with dummy methods
92  client = Client('opc.tcp://dummy_address:10000')
93  client.connect = increment_state.__get__(client)
94  client.disconnect = increment_state.__get__(client)
95 
96  assert state[0] == 0
97  with client:
98  # test if client connected
99  self.assertEqual(state[0], 1)
100  # test if client disconnected
101  self.assertEqual(state[0], 2)
def test_service_fault(self)
Definition: tests_client.py:47
def test_context_manager(self)
Definition: tests_client.py:84
def test_objects_anonymous(self)
Definition: tests_client.py:53
def test_folder_anonymous(self)
Definition: tests_client.py:60
def add_server_methods(srv)
Definition: tests_common.py:16
def test_variable_anonymous(self)
Definition: tests_client.py:68


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:44