test_highlevel.py
Go to the documentation of this file.
00001 #! /usr/bin/env python
00002 
00003 import sys
00004 sys.path.insert(0, "../../build/bin/")
00005 import datetime
00006 import unittest
00007 from threading import Thread, Event
00008 try:
00009     from queue import Queue
00010 except ImportError:
00011     from Queue import Queue
00012 import time
00013 from threading import Condition
00014 import opcua
00015 
00016 port_num1 = 48410
00017 port_num2 = 48430
00018 
00019 class SubHandler(opcua.SubscriptionHandler):
00020     '''
00021         Dummy subscription client
00022     '''
00023     def data_change(self, handle, node, val, attr):
00024         pass    
00025 
00026     def event(self, handle, event):
00027         pass 
00028 
00029 class MySubHandler(opcua.SubscriptionHandler):
00030     '''
00031     More advanced subscription client using conditions, so we can wait for events in tests 
00032     '''
00033     def setup(self):
00034         self.cond = Condition()
00035         self.node = None
00036         self.handle = None
00037         self.attribute = None
00038         self.value = None
00039         self.ev = None
00040         return self.cond
00041 
00042     def data_change(self, handle, node, val, attr):
00043         self.handle = handle
00044         self.node = node
00045         self.value = val
00046         self.attribute = attr
00047         with self.cond:
00048             self.cond.notify_all()
00049 
00050     def event(self, handle, event):
00051         self.ev = event
00052         with self.cond:
00053             self.cond.notify_all()
00054 
00055 class Unit(unittest.TestCase):
00056     '''
00057     Simple unit test that do not need to setup a server or a client 
00058     '''
00059 
00060     def test_equal_nodeid(self):
00061         nid1 = opcua.NodeId(999, 2)
00062         nid2 = opcua.NodeId(999, 2)
00063         self.assertTrue(nid1==nid2)
00064         self.assertTrue(id(nid1)!=id(nid2))
00065     
00066     def test_zero_nodeid(self):
00067         self.assertEqual(opcua.NodeId(), opcua.NodeId(0,0))
00068         self.assertEqual(opcua.NodeId(), opcua.NodeId('ns=0;i=0;'))
00069 
00070     def test_string_nodeid(self):
00071         nid = opcua.NodeId('titi', 1)
00072         self.assertEqual(nid.namespace_index, 1)
00073         self.assertEqual(nid.identifier, 'titi')
00074         self.assertTrue(nid.is_string)
00075 
00076     def test_numeric_nodeid(self):
00077         nid = opcua.NodeId(999, 2)
00078         self.assertEqual(nid.namespace_index, 2)
00079         self.assertEqual(nid.identifier, 999)
00080         self.assertTrue(nid.is_integer)
00081 
00082     def test_qualifiedstring_nodeid(self):
00083         nid = opcua.NodeId('ns=2;s=PLC1.Manufacturer;')
00084         self.assertEqual(nid.namespace_index, 2)
00085         self.assertEqual(nid.identifier, 'PLC1.Manufacturer')
00086 
00087     
00088     def test_strrepr_nodeid(self):
00089         nid = opcua.NodeId('ns=2;s=PLC1.Manufacturer;')
00090         self.assertEqual(str(nid), 'ns=2;s=PLC1.Manufacturer;')
00091         self.assertEqual(repr(nid), 'ns=2;s=PLC1.Manufacturer;')
00092     
00093     def test_qualified_name(self):
00094         qn = opcua.QualifiedName('qname', 2)
00095         self.assertEqual(qn.namespace_index, 2)
00096         self.assertEqual(qn.name, 'qname')
00097         self.assertEqual(repr(qn), 'QualifiedName(2:qname)')
00098         self.assertEqual(repr(qn), str(qn))
00099         qn2 = opcua.QualifiedName(2,'qname')
00100         self.assertEqual(qn,qn2)
00101     
00102     def test_datavalue(self):
00103         dv = opcua.DataValue(123)
00104         self.assertEqual(dv.value, 123)
00105         dv = opcua.DataValue('abc')
00106         self.assertEqual(dv.value, 'abc')
00107         tnow = int(time.time())
00108         dv.source_timestamp = opcua.DateTime.from_time_t(tnow)
00109         self.assertEqual(dv.source_timestamp.to_time_t(), tnow)
00110         dv = opcua.DataValue(True, opcua.VariantType.BOOLEAN)
00111         self.assertEqual(dv.value, True)
00112         self.assertEqual(type(dv.value), bool)
00113 
00114     def test_application_description(self):
00115         ad=opcua.ApplicationDescription()
00116         self.assertEqual(ad.type,opcua.ApplicationType.Server)
00117         ad.discovery_urls=['a','b','c']
00118         self.assertEqual(ad.discovery_urls,['a','b','c'])
00119     
00120     def test_user_token_policy(self):
00121         utp = opcua.UserTokenPolicy()
00122         self.assertEqual(utp.token_type,opcua.UserTokenType.Anonymous)
00123 
00124     def test_endpoint_description(self):
00125         ed=opcua.EndpointDescription()
00126         self.assertEqual(ed.security_mode, opcua.MessageSecurityMode.Invalid)
00127         self.assertEqual(ed.security_level,0)
00128         ed.server_description=opcua.ApplicationDescription()
00129         self.assertEqual(ed.user_identify_tokens,[])
00130         ed.user_identify_tokens = [opcua.UserTokenPolicy()]*3
00131         self.assertEqual(len(ed.user_identify_tokens),3)
00132 
00133     def test_reference_description(self):
00134         rd=opcua.ReferenceDescription()
00135         self.assertEqual(rd.browse_name,opcua.QualifiedName())
00136         self.assertEqual(rd.is_forward,False)
00137         self.assertEqual(rd.reference_type_id,opcua.NodeId())
00138         self.assertEqual(rd.target_node_class,opcua.NodeClass.Unspecified)
00139         self.assertEqual(rd.target_node_id,opcua.NodeId())
00140         self.assertEqual(rd.target_node_type_definition,opcua.NodeId())
00141 
00142     def test_attribute_valueid(self):
00143         avid=opcua.ReadValueId()
00144         self.assertEqual(avid.node_id, opcua.NodeId())
00145         self.assertEqual(avid.attribute_id, opcua.AttributeId())
00146         self.assertEqual(avid.index_range, '')
00147         self.assertEqual(avid.data_encoding, opcua.QualifiedName())
00148 
00149     def test_write_value(self):
00150         wv=opcua.WriteValue()
00151         self.assertEqual(wv.node_id, opcua.NodeId())
00152         self.assertEqual(wv.attribute_id, opcua.AttributeId())
00153         self.assertEqual(wv.index_range,'')
00154         self.assertEqual(wv.value.value, None)
00155 
00156     def test_datetime(self):
00157         tnow1 = int(time.time())
00158         tnow = int((datetime.datetime.utcnow() - datetime.datetime(1970,1,1)).total_seconds())
00159         self.assertEqual(tnow, tnow1) #if this one fails this is a system error, not freopcua
00160 
00161         dt = opcua.DateTime.from_time_t(tnow)
00162         self.assertEqual(tnow, dt.to_time_t())
00163 
00164         pydt = dt.to_datetime()
00165         self.assertEqual(tnow, int((pydt - datetime.datetime(1970,1,1)).total_seconds()))
00166 
00167         dt2 = opcua.DateTime(pydt)
00168         #self.assertEqual(dt2, dt) #FIXME: not implemented
00169         pydt2 = dt.to_datetime()
00170         self.assertEqual(pydt, pydt2)
00171 
00172     def test_localized_text(self):
00173         event = opcua.Event()
00174         txt = "This is string"
00175         event.message = txt #message is of type LocalizedText
00176         self.assertEqual(txt, event.message)
00177 
00178 
00179 
00180 class CommonTests(object):
00181     '''
00182     Tests that will be run twice. Once on server side and once on 
00183     client side since we have been carefull to have the exact 
00184     same api on server and client side
00185     '''
00186 
00187     def test_root(self):
00188         root = self.opc.get_root_node()
00189         self.assertEqual(opcua.QualifiedName('Root', 0), root.get_browse_name())
00190         nid = opcua.NodeId(84, 0) 
00191         self.assertEqual(nid, root.get_id())
00192 
00193     def test_objects(self):
00194         objects = self.opc.get_objects_node()
00195         self.assertEqual(opcua.QualifiedName('Objects', 0), objects.get_browse_name())
00196         nid = opcua.NodeId(85, 0) 
00197         self.assertEqual(nid, objects.get_id())
00198 
00199     def test_add_nodes(self):
00200         objects = self.opc.get_objects_node()
00201         f = objects.add_folder(3, 'MyFolder')
00202         v = f.add_variable(3, 'MyVariable', 6)
00203         p = f.add_property(3, 'MyProperty', 10)
00204         childs = f.get_children()
00205         self.assertTrue(v in childs)
00206         self.assertTrue(p in childs)
00207 
00208     def test_add_numeric_variable(self):
00209         objects = self.opc.get_objects_node()
00210         v = objects.add_variable('ns=3;i=888;', '3:numericnodefromstring', 99)
00211         nid = opcua.NodeId(888, 3)
00212         qn = opcua.QualifiedName('numericnodefromstring', 3) 
00213         self.assertEqual(nid, v.get_id())
00214         self.assertEqual(qn, v.get_browse_name())
00215 
00216     def test_add_string_variable(self):
00217         objects = self.opc.get_objects_node()
00218         v = objects.add_variable('ns=3;s=stringid;', '3:stringnodefromstring', [68])
00219         nid = opcua.NodeId('stringid', 3) 
00220         qn = opcua.QualifiedName('stringnodefromstring', 3) 
00221         self.assertEqual(nid, v.get_id())
00222         self.assertEqual(qn, v.get_browse_name())
00223 
00224     def test_add_string_array_variable(self):
00225         objects = self.opc.get_objects_node()
00226         v = objects.add_variable('ns=3;s=stringarrayid;', '9:stringarray', ['l', 'b'])
00227         nid = opcua.NodeId('stringarrayid', 3) 
00228         qn = opcua.QualifiedName('stringarray', 9) 
00229         self.assertEqual(nid, v.get_id())
00230         self.assertEqual(qn, v.get_browse_name())
00231         val = v.get_value()
00232         self.assertEqual(['l', 'b'], val)
00233 
00234     def test_add_numeric_node(self):
00235         objects = self.opc.get_objects_node()
00236         nid = opcua.NodeId(9999, 3)
00237         qn = opcua.QualifiedName('AddNodeVar1', 3)
00238         v1 = objects.add_variable(nid, qn, 0)
00239         self.assertEqual(nid, v1.get_id())
00240         self.assertEqual(qn, v1.get_browse_name())
00241 
00242     def test_add_string_node(self):
00243         objects = self.opc.get_objects_node()
00244         qn = opcua.QualifiedName('AddNodeVar2', 3)
00245         nid = opcua.NodeId('AddNodeVar2Id', 3)
00246         v2 = objects.add_variable(nid, qn, 0)
00247         self.assertEqual(nid, v2.get_id())
00248         self.assertEqual(qn, v2.get_browse_name())
00249 
00250     def test_add_find_node_(self):
00251         objects = self.opc.get_objects_node()
00252         o = objects.add_object('ns=2;i=101;', '2:AddFindObject')
00253         o2 = objects.get_child('2:AddFindObject')
00254         self.assertEqual(o, o2)
00255 
00256     def test_node_path(self):
00257         objects = self.opc.get_objects_node()
00258         o = objects.add_object('ns=2;i=105;', '2:NodePathObject')
00259         root = self.opc.get_root_node()
00260         o2 = root.get_child(['0:Objects', '2:NodePathObject'])
00261         self.assertEqual(o, o2)
00262 
00263     def test_add_read_node(self):
00264         objects = self.opc.get_objects_node()
00265         o = objects.add_object('ns=2;i=102;', '2:AddReadObject')
00266         nid = opcua.NodeId(102, 2)
00267         self.assertEqual(o.get_id(), nid)
00268         qn = opcua.QualifiedName('AddReadObject', 2)
00269         self.assertEqual(o.get_browse_name(), qn)
00270 
00271     def test_simple_value(self):
00272         o = self.opc.get_objects_node()
00273         v = o.add_variable(3, 'VariableTestValue', 4.32)
00274         val = v.get_value()
00275         self.assertEqual(4.32, val)
00276 
00277     def test_add_exception(self):
00278         objects = self.opc.get_objects_node()
00279         o = objects.add_object('ns=2;i=103;', '2:AddReadObject')
00280         with self.assertRaises(RuntimeError):
00281             o2 = objects.add_object('ns=2;i=103;', '2:AddReadObject')
00282 
00283     def test_negative_value(self):
00284         o = self.opc.get_objects_node()
00285         v = o.add_variable(3, 'VariableNegativeValue', 4)
00286         v.set_value(-4.54)
00287         val = v.get_value()
00288         self.assertEqual(-4.54, val)
00289 
00290     def test_read_server_state(self):
00291         statenode = self.opc.get_node(opcua.ObjectId.Server_ServerStatus_State)
00292         state = statenode.get_value()
00293         self.assertEqual(state, 0)
00294 
00295     def test_array_value(self):
00296         o = self.opc.get_objects_node()
00297         v = o.add_variable(3, 'VariableArrayValue', [1, 2, 3])
00298         val = v.get_value()
00299         self.assertEqual([1, 2, 3], val)
00300 
00301     def test_array_size_one_value(self):
00302         o = self.opc.get_objects_node()
00303         v = o.add_variable(3, 'VariableArrayValue', [1, 2, 3])
00304         v.set_value([1])
00305         val = v.get_value()
00306         self.assertEqual([1], val) 
00307 
00308     def test_create_delete_subscription(self):
00309         o = self.opc.get_objects_node()
00310         v = o.add_variable(3, 'SubscriptionVariable', [1, 2, 3])
00311         sub = self.opc.create_subscription(100, sclt)
00312         handle = sub.subscribe_data_change(v)
00313         time.sleep(0.1)
00314         sub.unsubscribe(handle)
00315         sub.delete()
00316 
00317     def test_subscribe_events(self):
00318         sub = self.opc.create_subscription(100, sclt)
00319         handle = sub.subscribe_events()
00320         #time.sleep(0.1)
00321         sub.unsubscribe(handle)
00322         sub.delete()
00323 
00324     def test_events(self):
00325         msclt = MySubHandler()
00326         cond = msclt.setup()
00327         sub = self.opc.create_subscription(100, msclt)
00328         handle = sub.subscribe_events()
00329         
00330         ev = opcua.Event()
00331         msg = "this is my msg " 
00332         ev.message = msg
00333         tid = datetime.datetime.now()
00334         ev.time = tid
00335         ev.source_node = self.opc.get_server_node().get_id()
00336         ev.source_name = "our server node"
00337         ev.severity = 500
00338         self.srv.trigger_event(ev)
00339         
00340         with cond:
00341             ret = cond.wait(50000)
00342         if sys.version_info.major>2: self.assertEqual(ret, True) # we went into timeout waiting for subcsription callback
00343         else: pass # XXX
00344         self.assertIsNot(msclt.ev, None)# we did not receive event
00345         self.assertEqual(msclt.ev.message, msg)
00346         self.assertEqual(msclt.ev.time.to_datetime(), tid)
00347         self.assertEqual(msclt.ev.severity, 500)
00348         self.assertEqual(msclt.ev.source_node, self.opc.get_server_node().get_id())
00349 
00350         #time.sleep(0.1)
00351         sub.unsubscribe(handle)
00352         sub.delete()
00353 
00354 
00355     def test_get_namespace_index(self):
00356         idx = self.opc.get_namespace_index('http://freeopcua.github.io')
00357         self.assertEqual(idx, 1) 
00358 
00359     def test_use_namespace(self):
00360         root = self.opc.get_root_node()
00361         idx = self.opc.get_namespace_index('http://freeopcua.github.io')
00362         o = root.add_object(idx, 'test_namespace')
00363         self.assertEqual(idx, o.get_id().namespace_index) 
00364         o2 = root.get_child('{}:test_namespace'.format(idx))
00365         self.assertEqual(o, o2) 
00366 
00367     def test_non_existing_node(self):
00368         root = self.opc.get_root_node()
00369         with self.assertRaises(RuntimeError):
00370             server_time_node = root.get_child(['0:Objects', '0:Server', '0:nonexistingnode'])
00371 
00372     def test_subscription_data_change(self):
00373         '''
00374         test subscriptions. This is far too complicated for a unittest but, setting up subscriptions requires a lot of code, so when we first set it up, it is best to test as many things as possible
00375         '''
00376         msclt = MySubHandler()
00377         cond = msclt.setup()
00378 
00379         o = self.opc.get_objects_node()
00380 
00381         # subscribe to a variable
00382         startv1 = [1, 2, 3]
00383         v1 = o.add_variable(3, 'SubscriptionVariableV1', startv1)
00384         sub = self.opc.create_subscription(100, msclt)
00385         handle1 = sub.subscribe_data_change(v1)
00386 
00387         # Now check we get the start value
00388         with cond:
00389             ret = cond.wait(0.5)
00390         if sys.version_info.major>2: self.assertEqual(ret, True) # we went into timeout waiting for subcsription callback
00391         else: pass # XXX
00392         self.assertEqual(msclt.value, startv1)
00393         self.assertEqual(msclt.node, v1)
00394 
00395         # modify v1 and check we get value 
00396         v1.set_value([5])
00397         with cond:
00398             ret = cond.wait(0.5)
00399         if sys.version_info.major>2: self.assertEqual(ret, True) # we went into timeout waiting for subcsription callback
00400         else: pass # XXX
00401         self.assertEqual(msclt.node, v1)
00402         self.assertEqual(msclt.value, [5])
00403 
00404         sub.unsubscribe(handle1)
00405         sub.delete()
00406 
00407     def test_get_node_by_nodeid(self):
00408         root = self.opc.get_root_node()
00409         server_time_node = root.get_child(['0:Objects', '0:Server', '0:ServerStatus', '0:CurrentTime'])
00410         correct = self.opc.get_node(opcua.ObjectId.Server_ServerStatus_CurrentTime)
00411         self.assertEqual(server_time_node, correct)
00412 
00413     def test_subscribe_server_time(self):
00414         msclt = MySubHandler()
00415         cond = msclt.setup()
00416 
00417         server_time_node = self.opc.get_node(opcua.ObjectId.Server_ServerStatus_CurrentTime)
00418 
00419         sub = self.opc.create_subscription(200, msclt)
00420         handle = sub.subscribe_data_change(server_time_node)
00421 
00422         with cond:
00423             ret = cond.wait(0.5)
00424         if sys.version_info.major>2: self.assertEqual(ret, True) # we went into timeout waiting for subcsription callback
00425         else: pass # XXX
00426         self.assertEqual(msclt.node, server_time_node)
00427         # FIXME: Add test to verify server clock. How do I convert opcua.DataTime to python datetime?
00428 
00429         sub.unsubscribe(handle)
00430         sub.delete()
00431 
00432     def test_datetime_read(self):
00433         time_node = self.opc.get_node(opcua.ObjectId.Server_ServerStatus_CurrentTime)
00434         dt = time_node.get_value()
00435         pydt = dt.to_datetime()
00436         utcnow = datetime.datetime.utcnow()
00437         delta = utcnow - pydt
00438         self.assertTrue(delta < datetime.timedelta(seconds=1))
00439 
00440     def test_datetime_write(self):
00441         time_node = self.opc.get_node(opcua.ObjectId.Server_ServerStatus_CurrentTime)
00442         now = datetime.datetime.now()
00443         objects = self.opc.get_objects_node()
00444         v1 = objects.add_variable(4, "test_datetime", now)
00445         tid = v1.get_value()
00446         self.assertEqual(now, tid.to_datetime())
00447 
00448 
00449 
00450 
00451 class ServerProcess(Thread):
00452     '''
00453     Start a server in another process
00454     '''
00455     def __init__(self):
00456         Thread.__init__(self)
00457         self._exit = Event()
00458         self.started = Event()
00459         self._queue = Queue()
00460 
00461     def run(self):
00462         self.srv = opcua.Server()
00463         self.srv.set_endpoint('opc.tcp://localhost:%d' % port_num1)
00464         self.srv.start()
00465         self.started.set()
00466         while not self._exit.is_set():
00467             time.sleep(0.1)
00468             if not self._queue.empty():
00469                 ev = self._queue.get()
00470                 self.srv.trigger_event(ev)
00471         self.srv.stop()
00472 
00473     def stop(self):
00474         self._exit.set()
00475 
00476     def trigger_event(self, ev):
00477         self._queue.put(ev)
00478 
00479 
00480 class TestClient(unittest.TestCase, CommonTests):
00481     '''
00482     Run common tests on client side
00483     Of course we need a server so we start a server in another 
00484     process using python Process module
00485     Tests that can only be run on client side must be defined here
00486     '''
00487     @classmethod
00488     def setUpClass(self):
00489         # start server in its own process
00490         global globalserver
00491         self.srv = globalserver 
00492         self.srv.start()
00493         self.srv.started.wait() # let it initialize
00494 
00495         # start client
00496         self.clt = opcua.Client();
00497         self.clt.connect('opc.tcp://localhost:%d' % port_num1)
00498         self.opc = self.clt
00499 
00500     @classmethod
00501     def tearDownClass(self):
00502         self.clt.disconnect()
00503         # stop the server in its own process
00504         self.srv.stop()
00505         # wait for server to stop, otherwise we may try to start a 
00506         # new one before this one is really stopped
00507         self.srv.join()
00508 
00509 
00510 class TestServer(unittest.TestCase, CommonTests):
00511     '''
00512     Run common tests on server side
00513     Tests that can only be run on server side must be defined here
00514     '''
00515     @classmethod
00516     def setUpClass(self):
00517         self.srv = opcua.Server()
00518         self.srv.set_endpoint('opc.tcp://localhost:%d' % port_num2)
00519         self.srv.start()
00520         self.opc = self.srv 
00521 
00522     @classmethod
00523     def tearDownClass(self):
00524         self.srv.stop()
00525 
00526     def test_register_namespace(self):
00527         uri = 'http://mycustom.namespace.com'
00528         idx1 = self.opc.register_namespace(uri)
00529         idx2 = self.opc.get_namespace_index(uri)
00530         self.assertEqual(idx1, idx2) 
00531 
00532     def test_register_use_namespace(self):
00533         uri = 'http://my_very_custom.namespace.com'
00534         idx = self.opc.register_namespace(uri)
00535         root = self.opc.get_root_node()
00536         myvar = root.add_variable(idx, 'var_in_custom_namespace', [5])
00537         myid = myvar.get_id()
00538         self.assertEqual(idx, myid.namespace_index) 
00539         #self.assertEqual(uri, myid.namespace_uri) #FIXME: should return uri!!!
00540 
00541 
00542 
00543 
00544 if __name__ == '__main__':
00545     globalserver = ServerProcess() #server process will be started by client tests
00546     try:
00547         sclt = SubHandler()
00548         unittest.main(verbosity=3)
00549     finally:
00550         globalserver.stop()
00551 


ros_opcua_impl_freeopcua
Author(s): Denis Štogl
autogenerated on Sat Jun 8 2019 18:24:57