00001
00002
00003 import sys
00004 sys.path.insert(0, "../../build/bin/")
00005 import datetime
00006 import unittest
00007 from threading import Thread, Event
00008 try:
00009 from queue import Queue
00010 except ImportError:
00011 from Queue import Queue
00012 import time
00013 from threading import Condition
00014 import opcua
00015
00016 port_num1 = 48410
00017 port_num2 = 48430
00018
00019 class SubHandler(opcua.SubscriptionHandler):
00020 '''
00021 Dummy subscription client
00022 '''
00023 def data_change(self, handle, node, val, attr):
00024 pass
00025
00026 def event(self, handle, event):
00027 pass
00028
00029 class MySubHandler(opcua.SubscriptionHandler):
00030 '''
00031 More advanced subscription client using conditions, so we can wait for events in tests
00032 '''
00033 def setup(self):
00034 self.cond = Condition()
00035 self.node = None
00036 self.handle = None
00037 self.attribute = None
00038 self.value = None
00039 self.ev = None
00040 return self.cond
00041
00042 def data_change(self, handle, node, val, attr):
00043 self.handle = handle
00044 self.node = node
00045 self.value = val
00046 self.attribute = attr
00047 with self.cond:
00048 self.cond.notify_all()
00049
00050 def event(self, handle, event):
00051 self.ev = event
00052 with self.cond:
00053 self.cond.notify_all()
00054
00055 class Unit(unittest.TestCase):
00056 '''
00057 Simple unit test that do not need to setup a server or a client
00058 '''
00059
00060 def test_equal_nodeid(self):
00061 nid1 = opcua.NodeId(999, 2)
00062 nid2 = opcua.NodeId(999, 2)
00063 self.assertTrue(nid1==nid2)
00064 self.assertTrue(id(nid1)!=id(nid2))
00065
00066 def test_zero_nodeid(self):
00067 self.assertEqual(opcua.NodeId(), opcua.NodeId(0,0))
00068 self.assertEqual(opcua.NodeId(), opcua.NodeId('ns=0;i=0;'))
00069
00070 def test_string_nodeid(self):
00071 nid = opcua.NodeId('titi', 1)
00072 self.assertEqual(nid.namespace_index, 1)
00073 self.assertEqual(nid.identifier, 'titi')
00074 self.assertTrue(nid.is_string)
00075
00076 def test_numeric_nodeid(self):
00077 nid = opcua.NodeId(999, 2)
00078 self.assertEqual(nid.namespace_index, 2)
00079 self.assertEqual(nid.identifier, 999)
00080 self.assertTrue(nid.is_integer)
00081
00082 def test_qualifiedstring_nodeid(self):
00083 nid = opcua.NodeId('ns=2;s=PLC1.Manufacturer;')
00084 self.assertEqual(nid.namespace_index, 2)
00085 self.assertEqual(nid.identifier, 'PLC1.Manufacturer')
00086
00087
00088 def test_strrepr_nodeid(self):
00089 nid = opcua.NodeId('ns=2;s=PLC1.Manufacturer;')
00090 self.assertEqual(str(nid), 'ns=2;s=PLC1.Manufacturer;')
00091 self.assertEqual(repr(nid), 'ns=2;s=PLC1.Manufacturer;')
00092
00093 def test_qualified_name(self):
00094 qn = opcua.QualifiedName('qname', 2)
00095 self.assertEqual(qn.namespace_index, 2)
00096 self.assertEqual(qn.name, 'qname')
00097 self.assertEqual(repr(qn), 'QualifiedName(2:qname)')
00098 self.assertEqual(repr(qn), str(qn))
00099 qn2 = opcua.QualifiedName(2,'qname')
00100 self.assertEqual(qn,qn2)
00101
00102 def test_datavalue(self):
00103 dv = opcua.DataValue(123)
00104 self.assertEqual(dv.value, 123)
00105 dv = opcua.DataValue('abc')
00106 self.assertEqual(dv.value, 'abc')
00107 tnow = int(time.time())
00108 dv.source_timestamp = opcua.DateTime.from_time_t(tnow)
00109 self.assertEqual(dv.source_timestamp.to_time_t(), tnow)
00110 dv = opcua.DataValue(True, opcua.VariantType.BOOLEAN)
00111 self.assertEqual(dv.value, True)
00112 self.assertEqual(type(dv.value), bool)
00113
00114 def test_application_description(self):
00115 ad=opcua.ApplicationDescription()
00116 self.assertEqual(ad.type,opcua.ApplicationType.Server)
00117 ad.discovery_urls=['a','b','c']
00118 self.assertEqual(ad.discovery_urls,['a','b','c'])
00119
00120 def test_user_token_policy(self):
00121 utp = opcua.UserTokenPolicy()
00122 self.assertEqual(utp.token_type,opcua.UserTokenType.Anonymous)
00123
00124 def test_endpoint_description(self):
00125 ed=opcua.EndpointDescription()
00126 self.assertEqual(ed.security_mode, opcua.MessageSecurityMode.Invalid)
00127 self.assertEqual(ed.security_level,0)
00128 ed.server_description=opcua.ApplicationDescription()
00129 self.assertEqual(ed.user_identify_tokens,[])
00130 ed.user_identify_tokens = [opcua.UserTokenPolicy()]*3
00131 self.assertEqual(len(ed.user_identify_tokens),3)
00132
00133 def test_reference_description(self):
00134 rd=opcua.ReferenceDescription()
00135 self.assertEqual(rd.browse_name,opcua.QualifiedName())
00136 self.assertEqual(rd.is_forward,False)
00137 self.assertEqual(rd.reference_type_id,opcua.NodeId())
00138 self.assertEqual(rd.target_node_class,opcua.NodeClass.Unspecified)
00139 self.assertEqual(rd.target_node_id,opcua.NodeId())
00140 self.assertEqual(rd.target_node_type_definition,opcua.NodeId())
00141
00142 def test_attribute_valueid(self):
00143 avid=opcua.ReadValueId()
00144 self.assertEqual(avid.node_id, opcua.NodeId())
00145 self.assertEqual(avid.attribute_id, opcua.AttributeId())
00146 self.assertEqual(avid.index_range, '')
00147 self.assertEqual(avid.data_encoding, opcua.QualifiedName())
00148
00149 def test_write_value(self):
00150 wv=opcua.WriteValue()
00151 self.assertEqual(wv.node_id, opcua.NodeId())
00152 self.assertEqual(wv.attribute_id, opcua.AttributeId())
00153 self.assertEqual(wv.index_range,'')
00154 self.assertEqual(wv.value.value, None)
00155
00156 def test_datetime(self):
00157 tnow1 = int(time.time())
00158 tnow = int((datetime.datetime.utcnow() - datetime.datetime(1970,1,1)).total_seconds())
00159 self.assertEqual(tnow, tnow1)
00160
00161 dt = opcua.DateTime.from_time_t(tnow)
00162 self.assertEqual(tnow, dt.to_time_t())
00163
00164 pydt = dt.to_datetime()
00165 self.assertEqual(tnow, int((pydt - datetime.datetime(1970,1,1)).total_seconds()))
00166
00167 dt2 = opcua.DateTime(pydt)
00168
00169 pydt2 = dt.to_datetime()
00170 self.assertEqual(pydt, pydt2)
00171
00172 def test_localized_text(self):
00173 event = opcua.Event()
00174 txt = "This is string"
00175 event.message = txt
00176 self.assertEqual(txt, event.message)
00177
00178
00179
00180 class CommonTests(object):
00181 '''
00182 Tests that will be run twice. Once on server side and once on
00183 client side since we have been carefull to have the exact
00184 same api on server and client side
00185 '''
00186
00187 def test_root(self):
00188 root = self.opc.get_root_node()
00189 self.assertEqual(opcua.QualifiedName('Root', 0), root.get_browse_name())
00190 nid = opcua.NodeId(84, 0)
00191 self.assertEqual(nid, root.get_id())
00192
00193 def test_objects(self):
00194 objects = self.opc.get_objects_node()
00195 self.assertEqual(opcua.QualifiedName('Objects', 0), objects.get_browse_name())
00196 nid = opcua.NodeId(85, 0)
00197 self.assertEqual(nid, objects.get_id())
00198
00199 def test_add_nodes(self):
00200 objects = self.opc.get_objects_node()
00201 f = objects.add_folder(3, 'MyFolder')
00202 v = f.add_variable(3, 'MyVariable', 6)
00203 p = f.add_property(3, 'MyProperty', 10)
00204 childs = f.get_children()
00205 self.assertTrue(v in childs)
00206 self.assertTrue(p in childs)
00207
00208 def test_add_numeric_variable(self):
00209 objects = self.opc.get_objects_node()
00210 v = objects.add_variable('ns=3;i=888;', '3:numericnodefromstring', 99)
00211 nid = opcua.NodeId(888, 3)
00212 qn = opcua.QualifiedName('numericnodefromstring', 3)
00213 self.assertEqual(nid, v.get_id())
00214 self.assertEqual(qn, v.get_browse_name())
00215
00216 def test_add_string_variable(self):
00217 objects = self.opc.get_objects_node()
00218 v = objects.add_variable('ns=3;s=stringid;', '3:stringnodefromstring', [68])
00219 nid = opcua.NodeId('stringid', 3)
00220 qn = opcua.QualifiedName('stringnodefromstring', 3)
00221 self.assertEqual(nid, v.get_id())
00222 self.assertEqual(qn, v.get_browse_name())
00223
00224 def test_add_string_array_variable(self):
00225 objects = self.opc.get_objects_node()
00226 v = objects.add_variable('ns=3;s=stringarrayid;', '9:stringarray', ['l', 'b'])
00227 nid = opcua.NodeId('stringarrayid', 3)
00228 qn = opcua.QualifiedName('stringarray', 9)
00229 self.assertEqual(nid, v.get_id())
00230 self.assertEqual(qn, v.get_browse_name())
00231 val = v.get_value()
00232 self.assertEqual(['l', 'b'], val)
00233
00234 def test_add_numeric_node(self):
00235 objects = self.opc.get_objects_node()
00236 nid = opcua.NodeId(9999, 3)
00237 qn = opcua.QualifiedName('AddNodeVar1', 3)
00238 v1 = objects.add_variable(nid, qn, 0)
00239 self.assertEqual(nid, v1.get_id())
00240 self.assertEqual(qn, v1.get_browse_name())
00241
00242 def test_add_string_node(self):
00243 objects = self.opc.get_objects_node()
00244 qn = opcua.QualifiedName('AddNodeVar2', 3)
00245 nid = opcua.NodeId('AddNodeVar2Id', 3)
00246 v2 = objects.add_variable(nid, qn, 0)
00247 self.assertEqual(nid, v2.get_id())
00248 self.assertEqual(qn, v2.get_browse_name())
00249
00250 def test_add_find_node_(self):
00251 objects = self.opc.get_objects_node()
00252 o = objects.add_object('ns=2;i=101;', '2:AddFindObject')
00253 o2 = objects.get_child('2:AddFindObject')
00254 self.assertEqual(o, o2)
00255
00256 def test_node_path(self):
00257 objects = self.opc.get_objects_node()
00258 o = objects.add_object('ns=2;i=105;', '2:NodePathObject')
00259 root = self.opc.get_root_node()
00260 o2 = root.get_child(['0:Objects', '2:NodePathObject'])
00261 self.assertEqual(o, o2)
00262
00263 def test_add_read_node(self):
00264 objects = self.opc.get_objects_node()
00265 o = objects.add_object('ns=2;i=102;', '2:AddReadObject')
00266 nid = opcua.NodeId(102, 2)
00267 self.assertEqual(o.get_id(), nid)
00268 qn = opcua.QualifiedName('AddReadObject', 2)
00269 self.assertEqual(o.get_browse_name(), qn)
00270
00271 def test_simple_value(self):
00272 o = self.opc.get_objects_node()
00273 v = o.add_variable(3, 'VariableTestValue', 4.32)
00274 val = v.get_value()
00275 self.assertEqual(4.32, val)
00276
00277 def test_add_exception(self):
00278 objects = self.opc.get_objects_node()
00279 o = objects.add_object('ns=2;i=103;', '2:AddReadObject')
00280 with self.assertRaises(RuntimeError):
00281 o2 = objects.add_object('ns=2;i=103;', '2:AddReadObject')
00282
00283 def test_negative_value(self):
00284 o = self.opc.get_objects_node()
00285 v = o.add_variable(3, 'VariableNegativeValue', 4)
00286 v.set_value(-4.54)
00287 val = v.get_value()
00288 self.assertEqual(-4.54, val)
00289
00290 def test_read_server_state(self):
00291 statenode = self.opc.get_node(opcua.ObjectId.Server_ServerStatus_State)
00292 state = statenode.get_value()
00293 self.assertEqual(state, 0)
00294
00295 def test_array_value(self):
00296 o = self.opc.get_objects_node()
00297 v = o.add_variable(3, 'VariableArrayValue', [1, 2, 3])
00298 val = v.get_value()
00299 self.assertEqual([1, 2, 3], val)
00300
00301 def test_array_size_one_value(self):
00302 o = self.opc.get_objects_node()
00303 v = o.add_variable(3, 'VariableArrayValue', [1, 2, 3])
00304 v.set_value([1])
00305 val = v.get_value()
00306 self.assertEqual([1], val)
00307
00308 def test_create_delete_subscription(self):
00309 o = self.opc.get_objects_node()
00310 v = o.add_variable(3, 'SubscriptionVariable', [1, 2, 3])
00311 sub = self.opc.create_subscription(100, sclt)
00312 handle = sub.subscribe_data_change(v)
00313 time.sleep(0.1)
00314 sub.unsubscribe(handle)
00315 sub.delete()
00316
00317 def test_subscribe_events(self):
00318 sub = self.opc.create_subscription(100, sclt)
00319 handle = sub.subscribe_events()
00320
00321 sub.unsubscribe(handle)
00322 sub.delete()
00323
00324 def test_events(self):
00325 msclt = MySubHandler()
00326 cond = msclt.setup()
00327 sub = self.opc.create_subscription(100, msclt)
00328 handle = sub.subscribe_events()
00329
00330 ev = opcua.Event()
00331 msg = "this is my msg "
00332 ev.message = msg
00333 tid = datetime.datetime.now()
00334 ev.time = tid
00335 ev.source_node = self.opc.get_server_node().get_id()
00336 ev.source_name = "our server node"
00337 ev.severity = 500
00338 self.srv.trigger_event(ev)
00339
00340 with cond:
00341 ret = cond.wait(50000)
00342 if sys.version_info.major>2: self.assertEqual(ret, True)
00343 else: pass
00344 self.assertIsNot(msclt.ev, None)
00345 self.assertEqual(msclt.ev.message, msg)
00346 self.assertEqual(msclt.ev.time.to_datetime(), tid)
00347 self.assertEqual(msclt.ev.severity, 500)
00348 self.assertEqual(msclt.ev.source_node, self.opc.get_server_node().get_id())
00349
00350
00351 sub.unsubscribe(handle)
00352 sub.delete()
00353
00354
00355 def test_get_namespace_index(self):
00356 idx = self.opc.get_namespace_index('http://freeopcua.github.io')
00357 self.assertEqual(idx, 1)
00358
00359 def test_use_namespace(self):
00360 root = self.opc.get_root_node()
00361 idx = self.opc.get_namespace_index('http://freeopcua.github.io')
00362 o = root.add_object(idx, 'test_namespace')
00363 self.assertEqual(idx, o.get_id().namespace_index)
00364 o2 = root.get_child('{}:test_namespace'.format(idx))
00365 self.assertEqual(o, o2)
00366
00367 def test_non_existing_node(self):
00368 root = self.opc.get_root_node()
00369 with self.assertRaises(RuntimeError):
00370 server_time_node = root.get_child(['0:Objects', '0:Server', '0:nonexistingnode'])
00371
00372 def test_subscription_data_change(self):
00373 '''
00374 test subscriptions. This is far too complicated for a unittest but, setting up subscriptions requires a lot of code, so when we first set it up, it is best to test as many things as possible
00375 '''
00376 msclt = MySubHandler()
00377 cond = msclt.setup()
00378
00379 o = self.opc.get_objects_node()
00380
00381
00382 startv1 = [1, 2, 3]
00383 v1 = o.add_variable(3, 'SubscriptionVariableV1', startv1)
00384 sub = self.opc.create_subscription(100, msclt)
00385 handle1 = sub.subscribe_data_change(v1)
00386
00387
00388 with cond:
00389 ret = cond.wait(0.5)
00390 if sys.version_info.major>2: self.assertEqual(ret, True)
00391 else: pass
00392 self.assertEqual(msclt.value, startv1)
00393 self.assertEqual(msclt.node, v1)
00394
00395
00396 v1.set_value([5])
00397 with cond:
00398 ret = cond.wait(0.5)
00399 if sys.version_info.major>2: self.assertEqual(ret, True)
00400 else: pass
00401 self.assertEqual(msclt.node, v1)
00402 self.assertEqual(msclt.value, [5])
00403
00404 sub.unsubscribe(handle1)
00405 sub.delete()
00406
00407 def test_get_node_by_nodeid(self):
00408 root = self.opc.get_root_node()
00409 server_time_node = root.get_child(['0:Objects', '0:Server', '0:ServerStatus', '0:CurrentTime'])
00410 correct = self.opc.get_node(opcua.ObjectId.Server_ServerStatus_CurrentTime)
00411 self.assertEqual(server_time_node, correct)
00412
00413 def test_subscribe_server_time(self):
00414 msclt = MySubHandler()
00415 cond = msclt.setup()
00416
00417 server_time_node = self.opc.get_node(opcua.ObjectId.Server_ServerStatus_CurrentTime)
00418
00419 sub = self.opc.create_subscription(200, msclt)
00420 handle = sub.subscribe_data_change(server_time_node)
00421
00422 with cond:
00423 ret = cond.wait(0.5)
00424 if sys.version_info.major>2: self.assertEqual(ret, True)
00425 else: pass
00426 self.assertEqual(msclt.node, server_time_node)
00427
00428
00429 sub.unsubscribe(handle)
00430 sub.delete()
00431
00432 def test_datetime_read(self):
00433 time_node = self.opc.get_node(opcua.ObjectId.Server_ServerStatus_CurrentTime)
00434 dt = time_node.get_value()
00435 pydt = dt.to_datetime()
00436 utcnow = datetime.datetime.utcnow()
00437 delta = utcnow - pydt
00438 self.assertTrue(delta < datetime.timedelta(seconds=1))
00439
00440 def test_datetime_write(self):
00441 time_node = self.opc.get_node(opcua.ObjectId.Server_ServerStatus_CurrentTime)
00442 now = datetime.datetime.now()
00443 objects = self.opc.get_objects_node()
00444 v1 = objects.add_variable(4, "test_datetime", now)
00445 tid = v1.get_value()
00446 self.assertEqual(now, tid.to_datetime())
00447
00448
00449
00450
00451 class ServerProcess(Thread):
00452 '''
00453 Start a server in another process
00454 '''
00455 def __init__(self):
00456 Thread.__init__(self)
00457 self._exit = Event()
00458 self.started = Event()
00459 self._queue = Queue()
00460
00461 def run(self):
00462 self.srv = opcua.Server()
00463 self.srv.set_endpoint('opc.tcp://localhost:%d' % port_num1)
00464 self.srv.start()
00465 self.started.set()
00466 while not self._exit.is_set():
00467 time.sleep(0.1)
00468 if not self._queue.empty():
00469 ev = self._queue.get()
00470 self.srv.trigger_event(ev)
00471 self.srv.stop()
00472
00473 def stop(self):
00474 self._exit.set()
00475
00476 def trigger_event(self, ev):
00477 self._queue.put(ev)
00478
00479
00480 class TestClient(unittest.TestCase, CommonTests):
00481 '''
00482 Run common tests on client side
00483 Of course we need a server so we start a server in another
00484 process using python Process module
00485 Tests that can only be run on client side must be defined here
00486 '''
00487 @classmethod
00488 def setUpClass(self):
00489
00490 global globalserver
00491 self.srv = globalserver
00492 self.srv.start()
00493 self.srv.started.wait()
00494
00495
00496 self.clt = opcua.Client();
00497 self.clt.connect('opc.tcp://localhost:%d' % port_num1)
00498 self.opc = self.clt
00499
00500 @classmethod
00501 def tearDownClass(self):
00502 self.clt.disconnect()
00503
00504 self.srv.stop()
00505
00506
00507 self.srv.join()
00508
00509
00510 class TestServer(unittest.TestCase, CommonTests):
00511 '''
00512 Run common tests on server side
00513 Tests that can only be run on server side must be defined here
00514 '''
00515 @classmethod
00516 def setUpClass(self):
00517 self.srv = opcua.Server()
00518 self.srv.set_endpoint('opc.tcp://localhost:%d' % port_num2)
00519 self.srv.start()
00520 self.opc = self.srv
00521
00522 @classmethod
00523 def tearDownClass(self):
00524 self.srv.stop()
00525
00526 def test_register_namespace(self):
00527 uri = 'http://mycustom.namespace.com'
00528 idx1 = self.opc.register_namespace(uri)
00529 idx2 = self.opc.get_namespace_index(uri)
00530 self.assertEqual(idx1, idx2)
00531
00532 def test_register_use_namespace(self):
00533 uri = 'http://my_very_custom.namespace.com'
00534 idx = self.opc.register_namespace(uri)
00535 root = self.opc.get_root_node()
00536 myvar = root.add_variable(idx, 'var_in_custom_namespace', [5])
00537 myid = myvar.get_id()
00538 self.assertEqual(idx, myid.namespace_index)
00539
00540
00541
00542
00543
00544 if __name__ == '__main__':
00545 globalserver = ServerProcess()
00546 try:
00547 sclt = SubHandler()
00548 unittest.main(verbosity=3)
00549 finally:
00550 globalserver.stop()
00551