tests_client.py
Go to the documentation of this file.
00001 import unittest
00002 
00003 from opcua import Client
00004 from opcua import Server
00005 from opcua import ua
00006 
00007 from tests_subscriptions import SubscriptionTests
00008 from tests_common import CommonTests, add_server_methods
00009 from tests_xml import XmlTests 
00010 
00011 port_num1 = 48510
00012 
00013 
00014 class TestClient(unittest.TestCase, CommonTests, SubscriptionTests):
00015 
00016     '''
00017     Run common tests on client side
00018     Of course we need a server so we start also start a server
00019     Tests that can only be run on client side must be defined  in this class
00020     '''
00021     @classmethod
00022     def setUpClass(cls):
00023         # start our own server
00024         cls.srv = Server()
00025         cls.srv.set_endpoint('opc.tcp://localhost:{0:d}'.format(port_num1))
00026         add_server_methods(cls.srv)
00027         cls.srv.start()
00028 
00029         # start admin client
00030         # long timeout since travis (automated testing) can be really slow
00031         cls.clt = Client('opc.tcp://admin@localhost:{0:d}'.format(port_num1), timeout=10)
00032         cls.clt.connect()
00033         cls.opc = cls.clt
00034 
00035         # start anonymous client
00036         cls.ro_clt = Client('opc.tcp://localhost:{0:d}'.format(port_num1))
00037         cls.ro_clt.connect()
00038 
00039     @classmethod
00040     def tearDownClass(cls):
00041         #stop our clients
00042         cls.ro_clt.disconnect()
00043         cls.clt.disconnect()
00044         # stop the server 
00045         cls.srv.stop()
00046 
00047     def test_service_fault(self):
00048         request = ua.ReadRequest()
00049         request.TypeId = ua.FourByteNodeId(999)  # bad type!
00050         with self.assertRaises(ua.UaStatusCodeError):
00051             self.clt.uaclient._uasocket.send_request(request)
00052 
00053     def test_objects_anonymous(self):
00054         objects = self.ro_clt.get_objects_node()
00055         with self.assertRaises(ua.UaStatusCodeError):
00056             objects.set_attribute(ua.AttributeIds.WriteMask, ua.DataValue(999))
00057         with self.assertRaises(ua.UaStatusCodeError):
00058             f = objects.add_folder(3, 'MyFolder')
00059 
00060     def test_folder_anonymous(self):
00061         objects = self.clt.get_objects_node()
00062         f = objects.add_folder(3, 'MyFolderRO')
00063         f_ro = self.ro_clt.get_node(f.nodeid)
00064         self.assertEqual(f, f_ro)
00065         with self.assertRaises(ua.UaStatusCodeError):
00066             f2 = f_ro.add_folder(3, 'MyFolder2')
00067 
00068     def test_variable_anonymous(self):
00069         objects = self.clt.get_objects_node()
00070         v = objects.add_variable(3, 'MyROVariable', 6)
00071         v.set_value(4) #this should work
00072         v_ro = self.ro_clt.get_node(v.nodeid)
00073         with self.assertRaises(ua.UaStatusCodeError):
00074             v_ro.set_value(2)
00075         self.assertEqual(v_ro.get_value(), 4)
00076         v.set_writable(True)
00077         v_ro.set_value(2) #now it should work
00078         self.assertEqual(v_ro.get_value(), 2)
00079         v.set_writable(False)
00080         with self.assertRaises(ua.UaStatusCodeError):
00081             v_ro.set_value(9)
00082         self.assertEqual(v_ro.get_value(), 2)
00083 
00084     def test_context_manager(self):
00085         """ Context manager calls connect() and disconnect()
00086         """
00087         state = [0]
00088         def increment_state(self, *args, **kwargs):
00089             state[0] += 1
00090 
00091         # create client and replace instance methods with dummy methods
00092         client = Client('opc.tcp://dummy_address:10000')
00093         client.connect    = increment_state.__get__(client)
00094         client.disconnect = increment_state.__get__(client)
00095 
00096         assert state[0] == 0
00097         with client:
00098             # test if client connected
00099             self.assertEqual(state[0], 1)
00100         # test if client disconnected
00101         self.assertEqual(state[0], 2)


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:23