tests_common.py
Go to the documentation of this file.
00001 # encoding: utf-8
00002 from concurrent.futures import Future, TimeoutError
00003 import time
00004 from datetime import datetime
00005 from datetime import timedelta
00006 import math
00007 
00008 from opcua import ua
00009 from opcua import Node
00010 from opcua import uamethod
00011 from opcua import instantiate
00012 from opcua import copy_node
00013 from opcua.common import ua_utils
00014 
00015 
00016 def add_server_methods(srv):
00017     @uamethod
00018     def func(parent, value):
00019         return value * 2
00020 
00021     o = srv.get_objects_node()
00022     v = o.add_method(ua.NodeId("ServerMethod", 2), ua.QualifiedName('ServerMethod', 2), func, [ua.VariantType.Int64], [ua.VariantType.Int64])
00023 
00024     @uamethod
00025     def func2(parent, methodname, value):
00026         return math.sin(value)
00027 
00028     o = srv.get_objects_node()
00029     v = o.add_method(ua.NodeId("ServerMethodArray", 2), ua.QualifiedName('ServerMethodArray', 2), func2, [ua.VariantType.String, ua.VariantType.Int64], [ua.VariantType.Int64])
00030 
00031     @uamethod
00032     def func3(parent, mylist):
00033         return [i * 2 for i in mylist]
00034 
00035     o = srv.get_objects_node()
00036     v = o.add_method(ua.NodeId("ServerMethodArray2", 2), ua.QualifiedName('ServerMethodArray2', 2), func3, [ua.VariantType.Int64], [ua.VariantType.Int64])
00037 
00038 
00039 class CommonTests(object):
00040 
00041     '''
00042     Tests that will be run twice. Once on server side and once on
00043     client side since we have been carefull to have the exact
00044     same api on server and client side
00045     '''
00046     # jyst to avoid editor warnings
00047     opc = None
00048     assertEqual = lambda x, y: True
00049     assertIn = lambda x, y: True
00050 
00051     def test_find_servers(self):
00052         servers = self.opc.find_servers()
00053         # FIXME : finish
00054 
00055     def test_add_node_bad_args(self):
00056         obj = self.opc.get_objects_node()
00057 
00058         with self.assertRaises(TypeError):
00059             fold = obj.add_folder(1.2, "kk")
00060 
00061         with self.assertRaises(TypeError):
00062             fold = obj.add_folder(ua.UaError, "khjh")
00063 
00064         with self.assertRaises(ua.UaError):
00065             fold = obj.add_folder("kjk", 1.2)
00066 
00067         with self.assertRaises(TypeError):
00068             fold = obj.add_folder("i=0;s='oooo'", 1.2)
00069 
00070         with self.assertRaises(ua.UaError):
00071             fold = obj.add_folder("i=0;s='oooo'", "tt:oioi")
00072 
00073     def test_delete_nodes(self):
00074         obj = self.opc.get_objects_node()
00075         fold = obj.add_folder(2, "FolderToDelete")
00076         var = fold.add_variable(2, "VarToDelete", 9.1)
00077         childs = fold.get_children()
00078         self.assertIn(var, childs)
00079         self.opc.delete_nodes([var])
00080         with self.assertRaises(ua.UaStatusCodeError):
00081             var.set_value(7.8)
00082         with self.assertRaises(ua.UaStatusCodeError):
00083             obj.get_child(["2:FolderToDelete", "2:VarToDelete"])
00084         childs = fold.get_children()
00085         self.assertNotIn(var, childs)
00086 
00087     def test_delete_nodes_recursive(self):
00088         obj = self.opc.get_objects_node()
00089         fold = obj.add_folder(2, "FolderToDeleteR")
00090         var = fold.add_variable(2, "VarToDeleteR", 9.1)
00091         self.opc.delete_nodes([fold, var])
00092         with self.assertRaises(ua.UaStatusCodeError):
00093             var.set_value(7.8)
00094         with self.assertRaises(ua.UaStatusCodeError):
00095             obj.get_child(["2:FolderToDelete", "2:VarToDelete"])
00096 
00097     def test_delete_nodes_recursive2(self):
00098         obj = self.opc.get_objects_node()
00099         fold = obj.add_folder(2, "FolderToDeleteRoot")
00100         nfold = fold
00101         mynodes = []
00102         for i in range(7):
00103             nfold = fold.add_folder(2, "FolderToDeleteRoot")
00104             var = fold.add_variable(2, "VarToDeleteR", 9.1)
00105             var = fold.add_property(2, "ProToDeleteR", 9.1)
00106             prop = fold.add_property(2, "ProToDeleteR", 9.1)
00107             o = fold.add_object(3, "ObjToDeleteR")
00108             mynodes.append(nfold)
00109             mynodes.append(var)
00110             mynodes.append(prop)
00111             mynodes.append(o)
00112         self.opc.delete_nodes([fold], recursive=True)
00113         for node in mynodes:
00114             with self.assertRaises(ua.UaStatusCodeError):
00115                 node.get_browse_name()
00116 
00117     def test_server_node(self):
00118         node = self.opc.get_server_node()
00119         self.assertEqual(ua.QualifiedName('Server', 0), node.get_browse_name())
00120 
00121     def test_root(self):
00122         root = self.opc.get_root_node()
00123         self.assertEqual(ua.QualifiedName('Root', 0), root.get_browse_name())
00124         self.assertEqual(ua.LocalizedText('Root'), root.get_display_name())
00125         nid = ua.NodeId(84, 0)
00126         self.assertEqual(nid, root.nodeid)
00127 
00128     def test_objects(self):
00129         objects = self.opc.get_objects_node()
00130         self.assertEqual(ua.QualifiedName('Objects', 0), objects.get_browse_name())
00131         nid = ua.NodeId(85, 0)
00132         self.assertEqual(nid, objects.nodeid)
00133 
00134     def test_browse(self):
00135         objects = self.opc.get_objects_node()
00136         obj = objects.add_object(4, "browsetest")
00137         folder = obj.add_folder(4, "folder")
00138         prop = obj.add_property(4, "property", 1)
00139         prop2 = obj.add_property(4, "property2", 2)
00140         var = obj.add_variable(4, "variable", 3)
00141         obj2 = obj.add_object(4, "obj")
00142         alle = obj.get_children()
00143         self.assertTrue(prop in alle)
00144         self.assertTrue(prop2 in alle)
00145         self.assertTrue(var in alle)
00146         self.assertTrue(folder in alle)
00147         self.assertFalse(obj in alle)
00148         props = obj.get_children(refs=ua.ObjectIds.HasProperty)
00149         self.assertTrue(prop in props)
00150         self.assertTrue(prop2 in props)
00151         self.assertFalse(var in props)
00152         self.assertFalse(folder in props)
00153         self.assertFalse(obj2 in props)
00154         all_vars = obj.get_children(nodeclassmask=ua.NodeClass.Variable)
00155         self.assertTrue(prop in all_vars)
00156         self.assertTrue(var in all_vars)
00157         self.assertFalse(folder in props)
00158         self.assertFalse(obj2 in props)
00159         all_objs = obj.get_children(nodeclassmask=ua.NodeClass.Object)
00160         self.assertTrue(folder in all_objs)
00161         self.assertTrue(obj2 in all_objs)
00162         self.assertFalse(var in all_objs)
00163 
00164     def test_browse_references(self):
00165         objects = self.opc.get_objects_node()
00166         folder = objects.add_folder(4, "folder")
00167 
00168         childs = objects.get_referenced_nodes(refs=ua.ObjectIds.Organizes, direction=ua.BrowseDirection.Forward, includesubtypes=False)
00169         self.assertTrue(folder in childs)
00170 
00171         childs = objects.get_referenced_nodes(refs=ua.ObjectIds.Organizes, direction=ua.BrowseDirection.Both, includesubtypes=False)
00172         self.assertTrue(folder in childs)
00173 
00174         childs = objects.get_referenced_nodes(refs=ua.ObjectIds.Organizes, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
00175         self.assertFalse(folder in childs)
00176 
00177         parents = folder.get_referenced_nodes(refs=ua.ObjectIds.Organizes, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
00178         self.assertTrue(objects in parents)
00179 
00180         parents = folder.get_referenced_nodes(refs=ua.ObjectIds.HierarchicalReferences, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
00181         self.assertTrue(objects in parents)
00182 
00183         parent = folder.get_parent()
00184         self.assertEqual(parent, objects)
00185 
00186     def test_browsename_with_spaces(self):
00187         o = self.opc.get_objects_node()
00188         v = o.add_variable(3, 'BNVariable with spaces and %&+?/', 1.3)
00189         v2 = o.get_child("3:BNVariable with spaces and %&+?/")
00190         self.assertEqual(v, v2)
00191 
00192     def test_non_existing_path(self):
00193         root = self.opc.get_root_node()
00194         with self.assertRaises(ua.UaStatusCodeError):
00195             server_time_node = root.get_child(['0:Objects', '0:Server', '0:nonexistingnode'])
00196 
00197     def test_bad_attribute(self):
00198         root = self.opc.get_root_node()
00199         with self.assertRaises(ua.UaStatusCodeError):
00200             root.set_value(99)
00201 
00202     def test_get_node_by_nodeid(self):
00203         root = self.opc.get_root_node()
00204         server_time_node = root.get_child(['0:Objects', '0:Server', '0:ServerStatus', '0:CurrentTime'])
00205         correct = self.opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
00206         self.assertEqual(server_time_node, correct)
00207 
00208     def test_datetime_read(self):
00209         time_node = self.opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
00210         dt = time_node.get_value()
00211         utcnow = datetime.utcnow()
00212         delta = utcnow - dt
00213         self.assertTrue(delta < timedelta(seconds=1))
00214 
00215     def test_datetime_write(self):
00216         time_node = self.opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
00217         now = datetime.utcnow()
00218         objects = self.opc.get_objects_node()
00219         v1 = objects.add_variable(4, "test_datetime", now)
00220         tid = v1.get_value()
00221         self.assertEqual(now, tid)
00222 
00223     def test_variant_array_dim(self):
00224         objects = self.opc.get_objects_node()
00225         l = [[[1.0, 1.0, 1.0, 1.0], [2.0, 2.0, 2.0, 2.0], [3.0, 3.0, 3.0, 3.0]],[[5.0, 5.0, 5.0, 5.0], [7.0, 8.0, 9.0, 01.0], [1.0, 1.0, 1.0, 1.0]]]
00226         v = objects.add_variable(3, 'variableWithDims', l)
00227 
00228         v.set_array_dimensions([0, 0, 0])
00229         dim = v.get_array_dimensions()
00230         self.assertEqual(dim, [0, 0, 0])
00231 
00232         v.set_value_rank(0)
00233         rank = v.get_value_rank()
00234         self.assertEqual(rank, 0)
00235 
00236         v2 = v.get_value()
00237         self.assertEqual(v2, l)
00238         dv = v.get_data_value()
00239         self.assertEqual(dv.Value.Dimensions, [2,3,4])
00240 
00241         l = [[[], [], []], [[], [], []]]
00242         variant = ua.Variant(l, ua.VariantType.UInt32)
00243         v = objects.add_variable(3, 'variableWithDimsEmpty', variant)
00244         v2 = v.get_value()
00245         self.assertEqual(v2, l)
00246         dv = v.get_data_value()
00247         self.assertEqual(dv.Value.Dimensions, [2,3,0])
00248 
00249     def test_add_numeric_variable(self):
00250         objects = self.opc.get_objects_node()
00251         v = objects.add_variable('ns=3;i=888;', '3:numericnodefromstring', 99)
00252         nid = ua.NodeId(888, 3)
00253         qn = ua.QualifiedName('numericnodefromstring', 3)
00254         self.assertEqual(nid, v.nodeid)
00255         self.assertEqual(qn, v.get_browse_name())
00256 
00257     def test_add_string_variable(self):
00258         objects = self.opc.get_objects_node()
00259         v = objects.add_variable('ns=3;s=stringid;', '3:stringnodefromstring', [68])
00260         nid = ua.NodeId('stringid', 3)
00261         qn = ua.QualifiedName('stringnodefromstring', 3)
00262         self.assertEqual(nid, v.nodeid)
00263         self.assertEqual(qn, v.get_browse_name())
00264 
00265     def test_utf8(self):
00266         objects = self.opc.get_objects_node()
00267         utf_string = "æøå@%&"
00268         bn = ua.QualifiedName(utf_string, 3)
00269         nid = ua.NodeId("æølå", 3)
00270         val = "æøå"
00271         v = objects.add_variable(nid, bn, val)
00272         self.assertEqual(nid, v.nodeid)
00273         val2 = v.get_value()
00274         self.assertEqual(val, val2)
00275         bn2 = v.get_browse_name()
00276         self.assertEqual(bn, bn2)
00277 
00278     def test_null_variable(self):
00279         objects = self.opc.get_objects_node()
00280         var = objects.add_variable(3, 'nullstring', "a string")
00281         var.set_value(None)
00282         val = var.get_value()
00283         self.assertEqual(val, None)
00284         var.set_value("")
00285         val = var.get_value()
00286         self.assertNotEqual(val, None)
00287         self.assertEqual(val, "")
00288 
00289     def test_variable_data_type(self):
00290         objects = self.opc.get_objects_node()
00291         var = objects.add_variable(3, 'stringfordatatype', "a string")
00292         val = var.get_data_type_as_variant_type()
00293         self.assertEqual(val, ua.VariantType.String)
00294         var = objects.add_variable(3, 'stringarrayfordatatype', ["a", "b"])
00295         val = var.get_data_type_as_variant_type()
00296         self.assertEqual(val, ua.VariantType.String)
00297 
00298     def test_add_string_array_variable(self):
00299         objects = self.opc.get_objects_node()
00300         v = objects.add_variable('ns=3;s=stringarrayid;', '9:stringarray', ['l', 'b'])
00301         nid = ua.NodeId('stringarrayid', 3)
00302         qn = ua.QualifiedName('stringarray', 9)
00303         self.assertEqual(nid, v.nodeid)
00304         self.assertEqual(qn, v.get_browse_name())
00305         val = v.get_value()
00306         self.assertEqual(['l', 'b'], val)
00307 
00308     def test_add_numeric_node(self):
00309         objects = self.opc.get_objects_node()
00310         nid = ua.NodeId(9999, 3)
00311         qn = ua.QualifiedName('AddNodeVar1', 3)
00312         v1 = objects.add_variable(nid, qn, 0)
00313         self.assertEqual(nid, v1.nodeid)
00314         self.assertEqual(qn, v1.get_browse_name())
00315 
00316     def test_add_string_node(self):
00317         objects = self.opc.get_objects_node()
00318         qn = ua.QualifiedName('AddNodeVar2', 3)
00319         nid = ua.NodeId('AddNodeVar2Id', 3)
00320         v2 = objects.add_variable(nid, qn, 0)
00321         self.assertEqual(nid, v2.nodeid)
00322         self.assertEqual(qn, v2.get_browse_name())
00323 
00324     def test_add_find_node_(self):
00325         objects = self.opc.get_objects_node()
00326         o = objects.add_object('ns=2;i=101;', '2:AddFindObject')
00327         o2 = objects.get_child('2:AddFindObject')
00328         self.assertEqual(o, o2)
00329 
00330     def test_node_path(self):
00331         objects = self.opc.get_objects_node()
00332         o = objects.add_object('ns=2;i=105;', '2:NodePathObject')
00333         root = self.opc.get_root_node()
00334         o2 = root.get_child(['0:Objects', '2:NodePathObject'])
00335         self.assertEqual(o, o2)
00336 
00337     def test_add_read_node(self):
00338         objects = self.opc.get_objects_node()
00339         o = objects.add_object('ns=2;i=102;', '2:AddReadObject')
00340         nid = ua.NodeId(102, 2)
00341         self.assertEqual(o.nodeid, nid)
00342         qn = ua.QualifiedName('AddReadObject', 2)
00343         self.assertEqual(o.get_browse_name(), qn)
00344 
00345     def test_simple_value(self):
00346         o = self.opc.get_objects_node()
00347         v = o.add_variable(3, 'VariableTestValue', 4.32)
00348         val = v.get_value()
00349         self.assertEqual(4.32, val)
00350 
00351     def test_add_exception(self):
00352         objects = self.opc.get_objects_node()
00353         o = objects.add_object('ns=2;i=103;', '2:AddReadObject')
00354         with self.assertRaises(ua.UaStatusCodeError):
00355             o2 = objects.add_object('ns=2;i=103;', '2:AddReadObject')
00356 
00357     def test_negative_value(self):
00358         o = self.opc.get_objects_node()
00359         v = o.add_variable(3, 'VariableNegativeValue', 4)
00360         v.set_value(-4.54)
00361         val = v.get_value()
00362         self.assertEqual(-4.54, val)
00363 
00364     def test_read_server_state(self):
00365         statenode = self.opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_State))
00366         state = statenode.get_value()
00367         self.assertEqual(state, 0)
00368 
00369     def test_bad_node(self):
00370         bad = self.opc.get_node(ua.NodeId(999, 999))
00371         with self.assertRaises(ua.UaStatusCodeError):
00372             bad.get_browse_name()
00373         with self.assertRaises(ua.UaStatusCodeError):
00374             bad.set_value(89)
00375         with self.assertRaises(ua.UaStatusCodeError):
00376             bad.add_object(0, "0:myobj")
00377         with self.assertRaises(ua.UaStatusCodeError):
00378             bad.get_child("0:myobj")
00379 
00380     def test_value(self):
00381         o = self.opc.get_objects_node()
00382         var = ua.Variant(1.98, ua.VariantType.Double)
00383         v = o.add_variable(3, 'VariableValue', var)
00384         val = v.get_value()
00385         self.assertEqual(1.98, val)
00386 
00387         dvar = ua.DataValue(var)
00388         dv = v.get_data_value()
00389         self.assertEqual(ua.DataValue, type(dv))
00390         self.assertEqual(dvar.Value, dv.Value)
00391         self.assertEqual(dvar.Value, var)
00392 
00393     def test_set_value(self):
00394         o = self.opc.get_objects_node()
00395         var = ua.Variant(1.98, ua.VariantType.Double)
00396         dvar = ua.DataValue(var)
00397         v = o.add_variable(3, 'VariableValue', var)
00398         v.set_value(var.Value)
00399         v1 = v.get_value()
00400         self.assertEqual(v1, var.Value)
00401         v.set_value(var)
00402         v2 = v.get_value()
00403         self.assertEqual(v2, var.Value)
00404         v.set_data_value(dvar)
00405         v3 = v.get_data_value()
00406         self.assertEqual(v3.Value, dvar.Value)
00407 
00408     def test_array_value(self):
00409         o = self.opc.get_objects_node()
00410         v = o.add_variable(3, 'VariableArrayValue', [1, 2, 3])
00411         val = v.get_value()
00412         self.assertEqual([1, 2, 3], val)
00413 
00414     def test_bool_variable(self):
00415         o = self.opc.get_objects_node()
00416         v = o.add_variable(3, 'BoolVariable', True)
00417         dt = v.get_data_type_as_variant_type()
00418         self.assertEqual(dt, ua.VariantType.Boolean)
00419         val = v.get_value()
00420         self.assertEqual(True, val)
00421         v.set_value(False)
00422         val = v.get_value()
00423         self.assertEqual(False, val)
00424 
00425     def test_array_size_one_value(self):
00426         o = self.opc.get_objects_node()
00427         v = o.add_variable(3, 'VariableArrayValue', [1, 2, 3])
00428         v.set_value([1])
00429         val = v.get_value()
00430         self.assertEqual([1], val)
00431 
00432     def test_use_namespace(self):
00433         idx = self.opc.get_namespace_index("urn:freeopcua:python:server")
00434         self.assertEqual(idx, 1)
00435         root = self.opc.get_root_node()
00436         myvar = root.add_variable(idx, 'var_in_custom_namespace', [5])
00437         myid = myvar.nodeid
00438         self.assertEqual(idx, myid.NamespaceIndex)
00439 
00440     def test_method(self):
00441         o = self.opc.get_objects_node()
00442         m = o.get_child("2:ServerMethod")
00443         result = o.call_method("2:ServerMethod", 2.1)
00444         self.assertEqual(result, 4.2)
00445         with self.assertRaises(ua.UaStatusCodeError):
00446             # FIXME: we should raise a more precise exception
00447             result = o.call_method("2:ServerMethod", 2.1, 89, 9)
00448         with self.assertRaises(ua.UaStatusCodeError):
00449             result = o.call_method(ua.NodeId(999), 2.1)  # non existing method
00450 
00451     def test_method_array(self):
00452         o = self.opc.get_objects_node()
00453         m = o.get_child("2:ServerMethodArray")
00454         result = o.call_method(m, "sin", ua.Variant(math.pi))
00455         self.assertTrue(result < 0.01)
00456 
00457     def test_method_array2(self):
00458         o = self.opc.get_objects_node()
00459         m = o.get_child("2:ServerMethodArray2")
00460         result = o.call_method(m, [1.1, 3.4, 9])
00461         self.assertEqual(result, [2.2, 6.8, 18])
00462 
00463     def test_add_nodes(self):
00464         objects = self.opc.get_objects_node()
00465         f = objects.add_folder(3, 'MyFolder')
00466         child = objects.get_child("3:MyFolder")
00467         self.assertEqual(child, f)
00468         o = f.add_object(3, 'MyObject')
00469         child = f.get_child("3:MyObject")
00470         self.assertEqual(child, o)
00471         v = f.add_variable(3, 'MyVariable', 6)
00472         child = f.get_child("3:MyVariable")
00473         self.assertEqual(child, v)
00474         p = f.add_property(3, 'MyProperty', 10)
00475         child = f.get_child("3:MyProperty")
00476         self.assertEqual(child, p)
00477         childs = f.get_children()
00478         self.assertTrue(o in childs)
00479         self.assertTrue(v in childs)
00480         self.assertTrue(p in childs)
00481 
00482     def test_incl_subtypes(self):
00483         base_type = self.opc.get_root_node().get_child(["0:Types", "0:ObjectTypes", "0:BaseObjectType"])
00484         descs = base_type.get_children_descriptions(includesubtypes=True)
00485         self.assertTrue(len(descs) > 10)
00486         descs = base_type.get_children_descriptions(includesubtypes=False)
00487         self.assertEqual(len(descs), 0)
00488 
00489     def test_add_node_with_type(self):
00490         objects = self.opc.get_objects_node()
00491         f = objects.add_folder(3, 'MyFolder_TypeTest')
00492 
00493         o = f.add_object(3, 'MyObject1', ua.ObjectIds.BaseObjectType)
00494         self.assertEqual(o.get_type_definition().Identifier, ua.ObjectIds.BaseObjectType)
00495 
00496         o = f.add_object(3, 'MyObject2', ua.NodeId(ua.ObjectIds.BaseObjectType, 0))
00497         self.assertEqual(o.get_type_definition().Identifier, ua.ObjectIds.BaseObjectType)
00498 
00499         base_otype= self.opc.get_node(ua.ObjectIds.BaseObjectType)
00500         custom_otype = base_otype.add_object_type(2, 'MyFooObjectType')
00501 
00502         o = f.add_object(3, 'MyObject3', custom_otype.nodeid)
00503         self.assertEqual(o.get_type_definition().Identifier, custom_otype.nodeid.Identifier)
00504 
00505         references = o.get_references(refs=ua.ObjectIds.HasTypeDefinition, direction=ua.BrowseDirection.Forward)
00506         self.assertEqual(len(references), 1)
00507         self.assertEqual(references[0].NodeId, custom_otype.nodeid)
00508 
00509     def test_references_for_added_nodes(self):
00510         objects = self.opc.get_objects_node()
00511         o = objects.add_object(3, 'MyObject')
00512         nodes = objects.get_referenced_nodes(refs=ua.ObjectIds.Organizes, direction=ua.BrowseDirection.Forward, includesubtypes=False)
00513         self.assertTrue(o in nodes)
00514         nodes = o.get_referenced_nodes(refs=ua.ObjectIds.Organizes, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
00515         self.assertTrue(objects in nodes)
00516         self.assertEqual(o.get_parent(), objects)
00517         self.assertEqual(o.get_type_definition().Identifier, ua.ObjectIds.BaseObjectType)
00518 
00519         o2 = o.add_object(3, 'MySecondObject')
00520         nodes = o.get_referenced_nodes(refs=ua.ObjectIds.HasComponent, direction=ua.BrowseDirection.Forward, includesubtypes=False)
00521         self.assertTrue(o2 in nodes)
00522         nodes = o2.get_referenced_nodes(refs=ua.ObjectIds.HasComponent, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
00523         self.assertTrue(o in nodes)
00524         self.assertEqual(o2.get_parent(), o)
00525         self.assertEqual(o2.get_type_definition().Identifier, ua.ObjectIds.BaseObjectType)
00526 
00527         v = o.add_variable(3, 'MyVariable', 6)
00528         nodes = o.get_referenced_nodes(refs=ua.ObjectIds.HasComponent, direction=ua.BrowseDirection.Forward, includesubtypes=False)
00529         self.assertTrue(v in nodes)
00530         nodes = v.get_referenced_nodes(refs=ua.ObjectIds.HasComponent, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
00531         self.assertTrue(o in nodes)
00532         self.assertEqual(v.get_parent(), o)
00533         self.assertEqual(v.get_type_definition().Identifier, ua.ObjectIds.BaseDataVariableType)
00534 
00535         p = o.add_property(3, 'MyProperty', 2)
00536         nodes = o.get_referenced_nodes(refs=ua.ObjectIds.HasProperty, direction=ua.BrowseDirection.Forward, includesubtypes=False)
00537         self.assertTrue(p in nodes)
00538         nodes = p.get_referenced_nodes(refs=ua.ObjectIds.HasProperty, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
00539         self.assertTrue(o in nodes)
00540         self.assertEqual(p.get_parent(), o)
00541         self.assertEqual(p.get_type_definition().Identifier, ua.ObjectIds.PropertyType)
00542 
00543     def test_path_string(self):
00544         o = self.opc.nodes.objects.add_folder(1, "titif").add_object(3, "opath")
00545         path = o.get_path_as_string()
00546         self.assertEqual(["0:Root", "0:Objects", "1:titif", "3:opath"], path)
00547         path = o.get_path_as_string(2)
00548         self.assertEqual(["1:titif", "3:opath"], path)
00549         path = self.opc.get_node("i=13387").get_path_as_string()
00550         # FIXME this is wrong in our server! BaseObjectType is missing an inverse reference to its parent! seems xml definition is wrong
00551         self.assertEqual(['0:BaseObjectType', '0:FolderType', '0:FileDirectoryType', '0:CreateDirectory'], path) 
00552 
00553     def test_path(self):
00554         of = self.opc.nodes.objects.add_folder(1, "titif")
00555         op = of.add_object(3, "opath")
00556         path = op.get_path()
00557         self.assertEqual([self.opc.nodes.root, self.opc.nodes.objects, of, op], path)
00558         path = op.get_path(2)
00559         self.assertEqual([of, op], path)
00560         target = self.opc.get_node("i=13387")
00561         path = target.get_path()
00562         # FIXME this is wrong in our server! BaseObjectType is missing an inverse reference to its parent! seems xml definition is wrong
00563         self.assertEqual([self.opc.nodes.base_object_type, self.opc.nodes.folder_type, self.opc.get_node(ua.ObjectIds.FileDirectoryType), target], path)  
00564 
00565     def test_get_endpoints(self):
00566         endpoints = self.opc.get_endpoints()
00567         self.assertTrue(len(endpoints) > 0)
00568         self.assertTrue(endpoints[0].EndpointUrl.startswith("opc.tcp://"))
00569 
00570     def test_copy_node(self):
00571         dev_t = self.opc.nodes.base_data_type.add_object_type(0, "MyDevice")
00572         v_t = dev_t.add_variable(0, "sensor", 1.0)
00573         p_t = dev_t.add_property(0, "sensor_id", "0340")
00574         ctrl_t = dev_t.add_object(0, "controller")
00575         prop_t = ctrl_t.add_property(0, "state", "Running")
00576         # Create device sutype
00577         devd_t = dev_t.add_object_type(0, "MyDeviceDervived")
00578         v_t = devd_t.add_variable(0, "childparam", 1.0)
00579         p_t = devd_t.add_property(0, "sensorx_id", "0340")
00580         
00581 
00582         nodes = copy_node(self.opc.nodes.objects, dev_t)
00583         mydevice = nodes[0]
00584 
00585         self.assertEqual(mydevice.get_node_class(), ua.NodeClass.ObjectType)
00586         self.assertEqual(len(mydevice.get_children()), 4)
00587         obj = mydevice.get_child(["0:controller"])
00588         prop = mydevice.get_child(["0:controller", "0:state"])
00589         self.assertEqual(prop.get_type_definition().Identifier, ua.ObjectIds.PropertyType)
00590         self.assertEqual(prop.get_value(), "Running")
00591         self.assertNotEqual(prop.nodeid, prop_t.nodeid)
00592 
00593     def test_instantiate_1(self):
00594         # Create device type
00595         dev_t = self.opc.nodes.base_object_type.add_object_type(0, "MyDevice")
00596         v_t = dev_t.add_variable(0, "sensor", 1.0)
00597         p_t = dev_t.add_property(0, "sensor_id", "0340")
00598         ctrl_t = dev_t.add_object(0, "controller")
00599         prop_t = ctrl_t.add_property(0, "state", "Running")
00600 
00601         # Create device sutype
00602         devd_t = dev_t.add_object_type(0, "MyDeviceDervived")
00603         v_t = devd_t.add_variable(0, "childparam", 1.0)
00604         p_t = devd_t.add_property(0, "sensorx_id", "0340")
00605         
00606         # instanciate device
00607         nodes = instantiate(self.opc.nodes.objects, dev_t, bname="2:Device0001")
00608         mydevice = nodes[0]
00609 
00610         self.assertEqual(mydevice.get_node_class(), ua.NodeClass.Object)
00611         self.assertEqual(mydevice.get_type_definition(), dev_t.nodeid)
00612         obj = mydevice.get_child(["0:controller"])
00613         prop = mydevice.get_child(["0:controller", "0:state"])
00614         self.assertEqual(prop.get_type_definition().Identifier, ua.ObjectIds.PropertyType)
00615         self.assertEqual(prop.get_value(), "Running")
00616         self.assertNotEqual(prop.nodeid, prop_t.nodeid)
00617         
00618         # instanciate device subtype
00619         nodes = instantiate(self.opc.nodes.objects, devd_t, bname="2:Device0002")
00620         mydevicederived = nodes[0] 
00621         prop1 = mydevicederived.get_child(["0:sensorx_id"])
00622         var1 = mydevicederived.get_child(["0:childparam"])
00623         var_parent = mydevicederived.get_child(["0:sensor"])
00624         prop_parent = mydevicederived.get_child(["0:sensor_id"])
00625 
00626     def test_instantiate_string_nodeid(self):
00627         # Create device type
00628         dev_t = self.opc.nodes.base_object_type.add_object_type(0, "MyDevice2")
00629         v_t = dev_t.add_variable(0, "sensor", 1.0)
00630         p_t = dev_t.add_property(0, "sensor_id", "0340")
00631         ctrl_t = dev_t.add_object(0, "controller")
00632         prop_t = ctrl_t.add_property(0, "state", "Running")
00633 
00634         # instanciate device
00635         nodes = instantiate(self.opc.nodes.objects, dev_t, nodeid=ua.NodeId("InstDevice", 2, ua.NodeIdType.String), bname="2:InstDevice")
00636         mydevice = nodes[0]
00637 
00638         self.assertEqual(mydevice.get_node_class(), ua.NodeClass.Object)
00639         self.assertEqual(mydevice.get_type_definition(), dev_t.nodeid)
00640         obj = mydevice.get_child(["0:controller"])
00641         obj_nodeid_ident = obj.nodeid.Identifier
00642         prop = mydevice.get_child(["0:controller", "0:state"])
00643         self.assertEqual(obj_nodeid_ident, "InstDevice.controller")
00644         self.assertEqual(prop.get_type_definition().Identifier, ua.ObjectIds.PropertyType)
00645         self.assertEqual(prop.get_value(), "Running")
00646         self.assertNotEqual(prop.nodeid, prop_t.nodeid)
00647 
00648     def test_variable_with_datatype(self):
00649         v1 = self.opc.nodes.objects.add_variable(3, 'VariableEnumType1', ua.ApplicationType.ClientAndServer, datatype=ua.NodeId(ua.ObjectIds.ApplicationType))
00650         tp1 = v1.get_data_type()
00651         self.assertEqual(ua.NodeId(ua.ObjectIds.ApplicationType), tp1)
00652 
00653         v2 = self.opc.nodes.objects.add_variable(3, 'VariableEnumType2', ua.ApplicationType.ClientAndServer, datatype=ua.NodeId(ua.ObjectIds.ApplicationType) )
00654         tp2 = v2.get_data_type()
00655         self.assertEqual( ua.NodeId(ua.ObjectIds.ApplicationType), tp2)
00656 
00657     def test_enum(self):
00658         # create enum type
00659         enums = self.opc.get_root_node().get_child(["0:Types", "0:DataTypes", "0:BaseDataType", "0:Enumeration"])
00660         myenum_type = enums.add_data_type(0, "MyEnum")
00661         es = myenum_type.add_variable(0, "EnumStrings", [ua.LocalizedText("String0"), ua.LocalizedText("String1"), ua.LocalizedText("String2")], ua.VariantType.LocalizedText) 
00662         #es.set_value_rank(1)
00663         # instantiate
00664         o = self.opc.get_objects_node()
00665         myvar = o.add_variable(2, "MyEnumVar", ua.LocalizedText("String1"), datatype=myenum_type.nodeid)
00666         #myvar.set_writable(True)
00667         # tests
00668         self.assertEqual(myvar.get_data_type(), myenum_type.nodeid)
00669         myvar.set_value(ua.LocalizedText("String2"))
00670 
00671     def test_supertypes(self):
00672         nint32 = self.opc.get_node(ua.ObjectIds.Int32)
00673         node = ua_utils.get_node_supertype(nint32)
00674         self.assertEqual(node, self.opc.get_node(ua.ObjectIds.Integer))
00675 
00676         nodes = ua_utils.get_node_supertypes(nint32)
00677         self.assertEqual(nodes[1], self.opc.get_node(ua.ObjectIds.Number))
00678         self.assertEqual(nodes[0], self.opc.get_node(ua.ObjectIds.Integer))
00679 
00680         # test custom
00681         dtype = nint32.add_data_type(0, "MyCustomDataType")
00682         node = ua_utils.get_node_supertype(dtype)
00683         self.assertEqual(node, nint32)
00684 
00685         dtype2 = dtype.add_data_type(0, "MyCustomDataType2")
00686         node = ua_utils.get_node_supertype(dtype2)
00687         self.assertEqual(node, dtype)
00688 
00689     def test_base_data_type(self):
00690         nint32 = self.opc.get_node(ua.ObjectIds.Int32)
00691         dtype = nint32.add_data_type(0, "MyCustomDataType")
00692         dtype2 = dtype.add_data_type(0, "MyCustomDataType2")
00693         self.assertEqual(ua_utils.get_base_data_type(dtype), nint32)
00694         self.assertEqual(ua_utils.get_base_data_type(dtype2), nint32)
00695 
00696         ext = self.opc.nodes.objects.add_variable(0, "MyExtensionObject", ua.Argument())
00697         d = ext.get_data_type()
00698         d = self.opc.get_node(d)
00699         self.assertEqual(ua_utils.get_base_data_type(d), self.opc.get_node(ua.ObjectIds.Structure))
00700         self.assertEqual(ua_utils.data_type_to_variant_type(d), ua.VariantType.ExtensionObject)
00701 
00702 
00703 
00704 


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:23