00001
00002 from concurrent.futures import Future, TimeoutError
00003 import time
00004 from datetime import datetime, timedelta
00005
00006 import opcua
00007 from opcua import ua
00008
00009
00010 class SubHandler():
00011
00012 """
00013 Dummy subscription client
00014 """
00015
00016 def datachange_notification(self, node, val, data):
00017 pass
00018
00019 def event_notification(self, event):
00020 pass
00021
00022
00023 class MySubHandler():
00024
00025 """
00026 More advanced subscription client using Future, so we can wait for events in tests
00027 """
00028
00029 def __init__(self):
00030 self.future = Future()
00031
00032 def reset(self):
00033 self.future = Future()
00034
00035 def datachange_notification(self, node, val, data):
00036 self.future.set_result((node, val, data))
00037
00038 def event_notification(self, event):
00039 self.future.set_result(event)
00040
00041
00042 class MySubHandler2():
00043 def __init__(self):
00044 self.results = []
00045
00046 def datachange_notification(self, node, val, data):
00047 self.results.append((node, val))
00048
00049 def event_notification(self, event):
00050 self.results.append(event)
00051
00052
00053 class MySubHandlerCounter():
00054 def __init__(self):
00055 self.datachange_count = 0
00056 self.event_count = 0
00057
00058 def datachange_notification(self, node, val, data):
00059 self.datachange_count += 1
00060
00061 def event_notification(self, event):
00062 self.event_count += 1
00063
00064
00065 class SubscriptionTests(object):
00066
00067 def test_subscription_failure(self):
00068 myhandler = MySubHandler()
00069 o = self.opc.get_objects_node()
00070 sub = self.opc.create_subscription(100, myhandler)
00071 with self.assertRaises(ua.UaStatusCodeError):
00072 handle1 = sub.subscribe_data_change(o)
00073 sub.delete()
00074
00075 def test_subscription_overload(self):
00076 nb = 10
00077 myhandler = MySubHandler()
00078 o = self.opc.get_objects_node()
00079 sub = self.opc.create_subscription(1, myhandler)
00080 variables = []
00081 subs = []
00082 for i in range(nb):
00083 v = o.add_variable(3, 'SubscriptionVariableOverload' + str(i), 99)
00084 variables.append(v)
00085 for i in range(nb):
00086 sub.subscribe_data_change(variables)
00087 for i in range(nb):
00088 for j in range(nb):
00089 variables[i].set_value(j)
00090 s = self.opc.create_subscription(1, myhandler)
00091 s.subscribe_data_change(variables)
00092 subs.append(s)
00093 sub.subscribe_data_change(variables[i])
00094 for i in range(nb):
00095 for j in range(nb):
00096 variables[i].set_value(j)
00097 sub.delete()
00098 for s in subs:
00099 s.delete()
00100
00101 def test_subscription_count(self):
00102 myhandler = MySubHandlerCounter()
00103 sub = self.opc.create_subscription(1, myhandler)
00104 o = self.opc.get_objects_node()
00105 var = o.add_variable(3, 'SubVarCounter', 0.1)
00106 sub.subscribe_data_change(var)
00107 nb = 12
00108 for i in range(nb):
00109 val = var.get_value()
00110 var.set_value(val +1)
00111 time.sleep(0.2)
00112 self.assertEqual(myhandler.datachange_count, nb + 1)
00113 sub.delete()
00114
00115 def test_subscription_count_list(self):
00116 myhandler = MySubHandlerCounter()
00117 sub = self.opc.create_subscription(1, myhandler)
00118 o = self.opc.get_objects_node()
00119 var = o.add_variable(3, 'SubVarCounter', [0.1, 0.2])
00120 sub.subscribe_data_change(var)
00121 nb = 12
00122 for i in range(nb):
00123 val = var.get_value()
00124 val.append(i)
00125 var.set_value(val)
00126 time.sleep(0.2)
00127 self.assertEqual(myhandler.datachange_count, nb + 1)
00128 sub.delete()
00129
00130 def test_subscription_count_no_change(self):
00131 myhandler = MySubHandlerCounter()
00132 sub = self.opc.create_subscription(1, myhandler)
00133 o = self.opc.get_objects_node()
00134 var = o.add_variable(3, 'SubVarCounter', [0.1, 0.2])
00135 sub.subscribe_data_change(var)
00136 nb = 12
00137 for i in range(nb):
00138 val = var.get_value()
00139 var.set_value(val)
00140 time.sleep(0.2)
00141 self.assertEqual(myhandler.datachange_count, 1)
00142 sub.delete()
00143
00144 def test_subscription_count_empty(self):
00145 myhandler = MySubHandlerCounter()
00146 sub = self.opc.create_subscription(1, myhandler)
00147 o = self.opc.get_objects_node()
00148 var = o.add_variable(3, 'SubVarCounter', [0.1, 0.2, 0.3])
00149 sub.subscribe_data_change(var)
00150 while True:
00151 val = var.get_value()
00152 val.pop()
00153 var.set_value(val, ua.VariantType.Double)
00154 if not val:
00155 break
00156 time.sleep(0.2)
00157 self.assertEqual(myhandler.datachange_count, 4)
00158 sub.delete()
00159
00160 def test_subscription_overload_simple(self):
00161 nb = 10
00162 myhandler = MySubHandler()
00163 o = self.opc.get_objects_node()
00164 sub = self.opc.create_subscription(1, myhandler)
00165 variables = [o.add_variable(3, 'SubVarOverload' + str(i), i) for i in range(nb)]
00166 for i in range(nb):
00167 sub.subscribe_data_change(variables)
00168 sub.delete()
00169
00170 def test_subscription_data_change(self):
00171 """
00172 test subscriptions. This is far too complicated for
00173 a unittest but, setting up subscriptions requires a lot
00174 of code, so when we first set it up, it is best
00175 to test as many things as possible
00176 """
00177 myhandler = MySubHandler()
00178
00179 o = self.opc.get_objects_node()
00180
00181
00182 startv1 = [1, 2, 3]
00183 v1 = o.add_variable(3, 'SubscriptionVariableV1', startv1)
00184 sub = self.opc.create_subscription(100, myhandler)
00185 handle1 = sub.subscribe_data_change(v1)
00186
00187
00188 node, val, data = myhandler.future.result()
00189 self.assertEqual(val, startv1)
00190 self.assertEqual(node, v1)
00191
00192 myhandler.reset()
00193
00194
00195 v1.set_value([5])
00196 node, val, data = myhandler.future.result()
00197
00198 self.assertEqual(node, v1)
00199 self.assertEqual(val, [5])
00200
00201 with self.assertRaises(ua.UaStatusCodeError):
00202 sub.unsubscribe(999)
00203 sub.unsubscribe(handle1)
00204 with self.assertRaises(ua.UaStatusCodeError):
00205 sub.unsubscribe(handle1)
00206 sub.delete()
00207 with self.assertRaises(ua.UaStatusCodeError):
00208 sub.unsubscribe(handle1)
00209
00210 def test_subscription_data_change_bool(self):
00211 """
00212 test subscriptions. This is far too complicated for
00213 a unittest but, setting up subscriptions requires a lot
00214 of code, so when we first set it up, it is best
00215 to test as many things as possible
00216 """
00217 myhandler = MySubHandler()
00218
00219 o = self.opc.get_objects_node()
00220
00221
00222 startv1 = True
00223 v1 = o.add_variable(3, 'SubscriptionVariableBool', startv1)
00224 sub = self.opc.create_subscription(100, myhandler)
00225 handle1 = sub.subscribe_data_change(v1)
00226
00227
00228 node, val, data = myhandler.future.result()
00229 self.assertEqual(val, startv1)
00230 self.assertEqual(node, v1)
00231
00232 myhandler.reset()
00233
00234
00235 v1.set_value(False)
00236 node, val, data = myhandler.future.result()
00237 self.assertEqual(node, v1)
00238 self.assertEqual(val, False)
00239
00240 sub.delete()
00241
00242 def test_subscription_data_change_many(self):
00243 """
00244 test subscriptions. This is far too complicated for
00245 a unittest but, setting up subscriptions requires a lot
00246 of code, so when we first set it up, it is best
00247 to test as many things as possible
00248 """
00249 myhandler = MySubHandler2()
00250 o = self.opc.get_objects_node()
00251
00252 startv1 = True
00253 v1 = o.add_variable(3, 'SubscriptionVariableMany1', startv1)
00254 startv2 = [1.22, 1.65]
00255 v2 = o.add_variable(3, 'SubscriptionVariableMany2', startv2)
00256
00257 sub = self.opc.create_subscription(100, myhandler)
00258 handle1, handle2 = sub.subscribe_data_change([v1, v2])
00259
00260
00261 nodes = [v1, v2]
00262
00263 count = 0
00264 while not len(myhandler.results) > 1:
00265 count += 1
00266 time.sleep(0.1)
00267 if count > 100:
00268 self.fail("Did not get result from subscription")
00269 for node, val in myhandler.results:
00270 self.assertIn(node, nodes)
00271 nodes.remove(node)
00272 if node == v1:
00273 self.assertEqual(startv1, val)
00274 elif node == v2:
00275 self.assertEqual(startv2, val)
00276 else:
00277 self.fail("Error node {0} is neither {1} nor {2}".format(node, v1, v2))
00278
00279 sub.delete()
00280
00281 def test_subscribe_server_time(self):
00282 myhandler = MySubHandler()
00283
00284 server_time_node = self.opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
00285
00286 sub = self.opc.create_subscription(200, myhandler)
00287 handle = sub.subscribe_data_change(server_time_node)
00288
00289 node, val, data = myhandler.future.result()
00290 self.assertEqual(node, server_time_node)
00291 delta = datetime.utcnow() - val
00292 self.assertTrue(delta < timedelta(seconds=2))
00293
00294 sub.unsubscribe(handle)
00295 sub.delete()
00296
00297
00298
00299 def test_create_delete_subscription(self):
00300 o = self.opc.get_objects_node()
00301 v = o.add_variable(3, 'SubscriptionVariable', [1, 2, 3])
00302 sub = self.opc.create_subscription(100, MySubHandler())
00303 handle = sub.subscribe_data_change(v)
00304 time.sleep(0.1)
00305 sub.unsubscribe(handle)
00306 sub.delete()
00307
00308 def test_subscribe_events(self):
00309 sub = self.opc.create_subscription(100, MySubHandler())
00310 handle = sub.subscribe_events()
00311 time.sleep(0.1)
00312 sub.unsubscribe(handle)
00313 sub.delete()
00314
00315 def test_subscribe_events_to_wrong_node(self):
00316 sub = self.opc.create_subscription(100, MySubHandler())
00317 with self.assertRaises(ua.UaStatusCodeError):
00318 handle = sub.subscribe_events(self.opc.get_node("i=85"))
00319 o = self.opc.get_objects_node()
00320 v = o.add_variable(3, 'VariableNoEventNofierAttribute', 4)
00321 with self.assertRaises(ua.UaStatusCodeError):
00322 handle = sub.subscribe_events(v)
00323 sub.delete()
00324
00325 def test_get_event_from_type_node_BaseEvent(self):
00326 etype = self.opc.get_node(ua.ObjectIds.BaseEventType)
00327 properties = opcua.common.events.get_event_properties_from_type_node(etype)
00328 for child in etype.get_properties():
00329 self.assertTrue(child in properties)
00330
00331 def test_get_event_from_type_node_CustomEvent(self):
00332 etype = self.srv.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.AuditEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
00333
00334 properties = opcua.common.events.get_event_properties_from_type_node(etype)
00335
00336 for child in self.opc.get_node(ua.ObjectIds.BaseEventType).get_properties():
00337 self.assertTrue(child in properties)
00338 for child in self.opc.get_node(ua.ObjectIds.AuditEventType).get_properties():
00339 self.assertTrue(child in properties)
00340 for child in self.opc.get_node(etype.nodeid).get_properties():
00341 self.assertTrue(child in properties)
00342
00343 self.assertTrue(etype.get_child("2:PropertyNum") in properties)
00344 self.assertTrue(etype.get_child("2:PropertyString") in properties)
00345
00346 def test_events_default(self):
00347 evgen = self.srv.get_event_generator()
00348
00349 myhandler = MySubHandler()
00350 sub = self.opc.create_subscription(100, myhandler)
00351 handle = sub.subscribe_events()
00352
00353 tid = datetime.utcnow()
00354 msg = b"this is my msg "
00355 evgen.trigger(tid, msg)
00356
00357 ev = myhandler.future.result()
00358 self.assertIsNot(ev, None)
00359 self.assertEqual(ev.EventType, ua.NodeId(ua.ObjectIds.BaseEventType))
00360 self.assertEqual(ev.Severity, 1)
00361 self.assertEqual(ev.SourceName, self.opc.get_server_node().get_browse_name().Name)
00362 self.assertEqual(ev.SourceNode, self.opc.get_server_node().nodeid)
00363 self.assertEqual(ev.Message.Text, msg)
00364 self.assertEqual(ev.Time, tid)
00365
00366
00367 sub.unsubscribe(handle)
00368 sub.delete()
00369
00370 def test_events_MyObject(self):
00371 objects = self.srv.get_objects_node()
00372 o = objects.add_object(3, 'MyObject')
00373 evgen = self.srv.get_event_generator(source=o)
00374
00375 myhandler = MySubHandler()
00376 sub = self.opc.create_subscription(100, myhandler)
00377 handle = sub.subscribe_events(o)
00378
00379 tid = datetime.utcnow()
00380 msg = b"this is my msg "
00381 evgen.trigger(tid, msg)
00382
00383 ev = myhandler.future.result(10)
00384 self.assertIsNot(ev, None)
00385 self.assertEqual(ev.EventType, ua.NodeId(ua.ObjectIds.BaseEventType))
00386 self.assertEqual(ev.Severity, 1)
00387 self.assertEqual(ev.SourceName, 'MyObject')
00388 self.assertEqual(ev.SourceNode, o.nodeid)
00389 self.assertEqual(ev.Message.Text, msg)
00390 self.assertEqual(ev.Time, tid)
00391
00392
00393 sub.unsubscribe(handle)
00394 sub.delete()
00395
00396 def test_events_wrong_source(self):
00397 objects = self.srv.get_objects_node()
00398 o = objects.add_object(3, 'MyObject')
00399 evgen = self.srv.get_event_generator(source=o)
00400
00401 myhandler = MySubHandler()
00402 sub = self.opc.create_subscription(100, myhandler)
00403 handle = sub.subscribe_events()
00404
00405 tid = datetime.utcnow()
00406 msg = b"this is my msg "
00407 evgen.trigger(tid, msg)
00408
00409 with self.assertRaises(TimeoutError):
00410 ev = myhandler.future.result(2)
00411
00412
00413 sub.unsubscribe(handle)
00414 sub.delete()
00415
00416 def test_events_CustomEvent(self):
00417 etype = self.srv.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
00418 evgen = self.srv.get_event_generator(etype)
00419
00420 myhandler = MySubHandler()
00421 sub = self.opc.create_subscription(100, myhandler)
00422 handle = sub.subscribe_events(evtypes=etype)
00423
00424 propertynum = 2
00425 propertystring = "This is my test"
00426 evgen.event.PropertyNum = propertynum
00427 evgen.event.PropertyString = propertystring
00428 serverity = 500
00429 evgen.event.Severity = serverity
00430 tid = datetime.utcnow()
00431 msg = b"this is my msg "
00432 evgen.trigger(tid, msg)
00433
00434 ev = myhandler.future.result(10)
00435 self.assertIsNot(ev, None)
00436 self.assertEqual(ev.EventType, etype.nodeid)
00437 self.assertEqual(ev.Severity, serverity)
00438 self.assertEqual(ev.SourceName, self.opc.get_server_node().get_browse_name().Name)
00439 self.assertEqual(ev.SourceNode, self.opc.get_server_node().nodeid)
00440 self.assertEqual(ev.Message.Text, msg)
00441 self.assertEqual(ev.Time, tid)
00442 self.assertEqual(ev.PropertyNum, propertynum)
00443 self.assertEqual(ev.PropertyString, propertystring)
00444
00445
00446 sub.unsubscribe(handle)
00447 sub.delete()
00448
00449 def test_events_CustomEvent_MyObject(self):
00450 objects = self.srv.get_objects_node()
00451 o = objects.add_object(3, 'MyObject')
00452 etype = self.srv.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
00453 evgen = self.srv.get_event_generator(etype, o)
00454
00455 myhandler = MySubHandler()
00456 sub = self.opc.create_subscription(100, myhandler)
00457 handle = sub.subscribe_events(o, etype)
00458
00459 propertynum = 2
00460 propertystring = "This is my test"
00461 evgen.event.PropertyNum = propertynum
00462 evgen.event.PropertyString = propertystring
00463 tid = datetime.utcnow()
00464 msg = b"this is my msg "
00465 evgen.trigger(tid, msg)
00466
00467 ev = myhandler.future.result(10)
00468 self.assertIsNot(ev, None)
00469 self.assertEqual(ev.EventType, etype.nodeid)
00470 self.assertEqual(ev.Severity, 1)
00471 self.assertEqual(ev.SourceName, 'MyObject')
00472 self.assertEqual(ev.SourceNode, o.nodeid)
00473 self.assertEqual(ev.Message.Text, msg)
00474 self.assertEqual(ev.Time, tid)
00475 self.assertEqual(ev.PropertyNum, propertynum)
00476 self.assertEqual(ev.PropertyString, propertystring)
00477
00478
00479 sub.unsubscribe(handle)
00480 sub.delete()
00481
00482 def test_several_different_events(self):
00483 objects = self.srv.get_objects_node()
00484 o = objects.add_object(3, 'MyObject')
00485
00486 etype1 = self.srv.create_custom_event_type(2, 'MyEvent1', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
00487 evgen1 = self.srv.get_event_generator(etype1, o)
00488
00489 etype2 = self.srv.create_custom_event_type(2, 'MyEvent2', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
00490 evgen2 = self.srv.get_event_generator(etype2, o)
00491
00492 myhandler = MySubHandler2()
00493 sub = self.opc.create_subscription(100, myhandler)
00494 handle = sub.subscribe_events(o, etype1)
00495
00496 propertynum1 = 1
00497 propertystring1 = "This is my test 1"
00498 evgen1.event.PropertyNum = propertynum1
00499 evgen1.event.PropertyString = propertystring1
00500
00501 propertynum2 = 2
00502 propertystring2 = "This is my test 2"
00503 evgen2.event.PropertyNum = propertynum2
00504 evgen2.event.PropertyString = propertystring2
00505
00506 for i in range(3):
00507 evgen1.trigger()
00508 evgen2.trigger()
00509 time.sleep(1)
00510
00511 self.assertEqual(len(myhandler.results), 3)
00512 ev = myhandler.results[-1]
00513 self.assertEqual(ev.EventType, etype1.nodeid)
00514
00515 handle = sub.subscribe_events(o, etype2)
00516 for i in range(4):
00517 evgen1.trigger()
00518 evgen2.trigger()
00519 time.sleep(1)
00520
00521
00522 ev1s = [ev for ev in myhandler.results if ev.EventType == etype1.nodeid]
00523 ev2s = [ev for ev in myhandler.results if ev.EventType == etype2.nodeid]
00524
00525 self.assertEqual(len(myhandler.results), 11)
00526 self.assertEqual(len(ev2s), 4)
00527 self.assertEqual(len(ev1s), 7)
00528
00529 sub.unsubscribe(handle)
00530 sub.delete()
00531
00532 def test_several_different_events_2(self):
00533 objects = self.srv.get_objects_node()
00534 o = objects.add_object(3, 'MyObject')
00535
00536 etype1 = self.srv.create_custom_event_type(2, 'MyEvent1', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
00537 evgen1 = self.srv.get_event_generator(etype1, o)
00538
00539 etype2 = self.srv.create_custom_event_type(2, 'MyEvent2', ua.ObjectIds.BaseEventType, [('PropertyNum2', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
00540 evgen2 = self.srv.get_event_generator(etype2, o)
00541
00542 etype3 = self.srv.create_custom_event_type(2, 'MyEvent3', ua.ObjectIds.BaseEventType, [('PropertyNum3', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
00543 evgen3 = self.srv.get_event_generator(etype3, o)
00544
00545 myhandler = MySubHandler2()
00546 sub = self.opc.create_subscription(100, myhandler)
00547 handle = sub.subscribe_events(o, [etype1, etype3])
00548
00549 propertynum1 = 1
00550 propertystring1 = "This is my test 1"
00551 evgen1.event.PropertyNum = propertynum1
00552 evgen1.event.PropertyString = propertystring1
00553
00554 propertynum2 = 2
00555 propertystring2 = "This is my test 2"
00556 evgen2.event.PropertyNum2 = propertynum2
00557 evgen2.event.PropertyString = propertystring2
00558
00559 propertynum3 = 3
00560 propertystring3 = "This is my test 3"
00561 evgen3.event.PropertyNum3 = propertynum3
00562 evgen3.event.PropertyString = propertystring2
00563
00564 for i in range(3):
00565 evgen1.trigger()
00566 evgen2.trigger()
00567 evgen3.trigger()
00568 evgen3.event.PropertyNum3 = 9999
00569 evgen3.trigger()
00570 time.sleep(1)
00571
00572 ev1s = [ev for ev in myhandler.results if ev.EventType == etype1.nodeid]
00573 ev2s = [ev for ev in myhandler.results if ev.EventType == etype2.nodeid]
00574 ev3s = [ev for ev in myhandler.results if ev.EventType == etype3.nodeid]
00575
00576 self.assertEqual(len(myhandler.results), 7)
00577 self.assertEqual(len(ev1s), 3)
00578 self.assertEqual(len(ev2s), 0)
00579 self.assertEqual(len(ev3s), 4)
00580 self.assertEqual(ev1s[0].PropertyNum, propertynum1)
00581 self.assertEqual(ev3s[0].PropertyNum3, propertynum3)
00582 self.assertEqual(ev3s[-1].PropertyNum3, 9999)
00583 self.assertEqual(ev1s[0].PropertyNum3, None)
00584
00585 sub.unsubscribe(handle)
00586 sub.delete()
00587