tests_subscriptions.py
Go to the documentation of this file.
00001 
00002 from concurrent.futures import Future, TimeoutError
00003 import time
00004 from datetime import datetime, timedelta
00005 
00006 import opcua
00007 from opcua import ua
00008 
00009 
00010 class SubHandler():
00011 
00012     """
00013     Dummy subscription client
00014     """
00015 
00016     def datachange_notification(self, node, val, data):
00017         pass
00018 
00019     def event_notification(self, event):
00020         pass
00021 
00022 
00023 class MySubHandler():
00024 
00025     """
00026     More advanced subscription client using Future, so we can wait for events in tests
00027     """
00028 
00029     def __init__(self):
00030         self.future = Future()
00031 
00032     def reset(self):
00033         self.future = Future()
00034 
00035     def datachange_notification(self, node, val, data):
00036         self.future.set_result((node, val, data))
00037 
00038     def event_notification(self, event):
00039         self.future.set_result(event)
00040 
00041 
00042 class MySubHandler2():
00043     def __init__(self):
00044         self.results = []
00045 
00046     def datachange_notification(self, node, val, data):
00047         self.results.append((node, val))
00048 
00049     def event_notification(self, event):
00050         self.results.append(event)
00051 
00052 
00053 class MySubHandlerCounter():
00054     def __init__(self):
00055         self.datachange_count = 0
00056         self.event_count = 0
00057 
00058     def datachange_notification(self, node, val, data):
00059         self.datachange_count += 1
00060 
00061     def event_notification(self, event):
00062         self.event_count += 1
00063 
00064 
00065 class SubscriptionTests(object):
00066 
00067     def test_subscription_failure(self):
00068         myhandler = MySubHandler()
00069         o = self.opc.get_objects_node()
00070         sub = self.opc.create_subscription(100, myhandler)
00071         with self.assertRaises(ua.UaStatusCodeError):
00072             handle1 = sub.subscribe_data_change(o) # we can only subscribe to variables so this should fail
00073         sub.delete()
00074 
00075     def test_subscription_overload(self):
00076         nb = 10
00077         myhandler = MySubHandler()
00078         o = self.opc.get_objects_node()
00079         sub = self.opc.create_subscription(1, myhandler)
00080         variables = []
00081         subs = []
00082         for i in range(nb):
00083             v = o.add_variable(3, 'SubscriptionVariableOverload' + str(i), 99)
00084             variables.append(v)
00085         for i in range(nb):
00086             sub.subscribe_data_change(variables)
00087         for i in range(nb):
00088             for j in range(nb):
00089                 variables[i].set_value(j)
00090             s = self.opc.create_subscription(1, myhandler)
00091             s.subscribe_data_change(variables)
00092             subs.append(s)
00093             sub.subscribe_data_change(variables[i])
00094         for i in range(nb):
00095             for j in range(nb):
00096                 variables[i].set_value(j)
00097         sub.delete()
00098         for s in subs:
00099             s.delete()
00100 
00101     def test_subscription_count(self):
00102         myhandler = MySubHandlerCounter()
00103         sub = self.opc.create_subscription(1, myhandler)
00104         o = self.opc.get_objects_node()
00105         var = o.add_variable(3, 'SubVarCounter', 0.1)
00106         sub.subscribe_data_change(var)
00107         nb = 12
00108         for i in range(nb):
00109             val = var.get_value()
00110             var.set_value(val +1)
00111         time.sleep(0.2)  # let last event arrive
00112         self.assertEqual(myhandler.datachange_count, nb + 1)
00113         sub.delete()
00114 
00115     def test_subscription_count_list(self):
00116         myhandler = MySubHandlerCounter()
00117         sub = self.opc.create_subscription(1, myhandler)
00118         o = self.opc.get_objects_node()
00119         var = o.add_variable(3, 'SubVarCounter', [0.1, 0.2])
00120         sub.subscribe_data_change(var)
00121         nb = 12
00122         for i in range(nb):
00123             val = var.get_value()
00124             val.append(i)
00125             var.set_value(val)
00126         time.sleep(0.2)  # let last event arrive
00127         self.assertEqual(myhandler.datachange_count, nb + 1)
00128         sub.delete()
00129 
00130     def test_subscription_count_no_change(self):
00131         myhandler = MySubHandlerCounter()
00132         sub = self.opc.create_subscription(1, myhandler)
00133         o = self.opc.get_objects_node()
00134         var = o.add_variable(3, 'SubVarCounter', [0.1, 0.2])
00135         sub.subscribe_data_change(var)
00136         nb = 12
00137         for i in range(nb):
00138             val = var.get_value()
00139             var.set_value(val)
00140         time.sleep(0.2)  # let last event arrive
00141         self.assertEqual(myhandler.datachange_count, 1)
00142         sub.delete()
00143 
00144     def test_subscription_count_empty(self):
00145         myhandler = MySubHandlerCounter()
00146         sub = self.opc.create_subscription(1, myhandler)
00147         o = self.opc.get_objects_node()
00148         var = o.add_variable(3, 'SubVarCounter', [0.1, 0.2, 0.3])
00149         sub.subscribe_data_change(var)
00150         while True:
00151             val = var.get_value()
00152             val.pop()
00153             var.set_value(val, ua.VariantType.Double)
00154             if not val:
00155                 break
00156         time.sleep(0.2)  # let last event arrive
00157         self.assertEqual(myhandler.datachange_count, 4)
00158         sub.delete()
00159 
00160     def test_subscription_overload_simple(self):
00161         nb = 10
00162         myhandler = MySubHandler()
00163         o = self.opc.get_objects_node()
00164         sub = self.opc.create_subscription(1, myhandler)
00165         variables = [o.add_variable(3, 'SubVarOverload' + str(i), i) for i in range(nb)]
00166         for i in range(nb):
00167             sub.subscribe_data_change(variables)
00168         sub.delete()
00169 
00170     def test_subscription_data_change(self):
00171         """
00172         test subscriptions. This is far too complicated for
00173         a unittest but, setting up subscriptions requires a lot
00174         of code, so when we first set it up, it is best
00175         to test as many things as possible
00176         """
00177         myhandler = MySubHandler()
00178 
00179         o = self.opc.get_objects_node()
00180 
00181         # subscribe to a variable
00182         startv1 = [1, 2, 3]
00183         v1 = o.add_variable(3, 'SubscriptionVariableV1', startv1)
00184         sub = self.opc.create_subscription(100, myhandler)
00185         handle1 = sub.subscribe_data_change(v1)
00186 
00187         # Now check we get the start value
00188         node, val, data = myhandler.future.result()
00189         self.assertEqual(val, startv1)
00190         self.assertEqual(node, v1)
00191 
00192         myhandler.reset()  # reset future object
00193 
00194         # modify v1 and check we get value
00195         v1.set_value([5])
00196         node, val, data = myhandler.future.result()
00197 
00198         self.assertEqual(node, v1)
00199         self.assertEqual(val, [5])
00200 
00201         with self.assertRaises(ua.UaStatusCodeError):
00202             sub.unsubscribe(999)  # non existing handle
00203         sub.unsubscribe(handle1)
00204         with self.assertRaises(ua.UaStatusCodeError):
00205             sub.unsubscribe(handle1)  # second try should fail
00206         sub.delete()
00207         with self.assertRaises(ua.UaStatusCodeError):
00208             sub.unsubscribe(handle1)  # sub does not exist anymore
00209 
00210     def test_subscription_data_change_bool(self):
00211         """
00212         test subscriptions. This is far too complicated for
00213         a unittest but, setting up subscriptions requires a lot
00214         of code, so when we first set it up, it is best
00215         to test as many things as possible
00216         """
00217         myhandler = MySubHandler()
00218 
00219         o = self.opc.get_objects_node()
00220 
00221         # subscribe to a variable
00222         startv1 = True
00223         v1 = o.add_variable(3, 'SubscriptionVariableBool', startv1)
00224         sub = self.opc.create_subscription(100, myhandler)
00225         handle1 = sub.subscribe_data_change(v1)
00226 
00227         # Now check we get the start value
00228         node, val, data = myhandler.future.result()
00229         self.assertEqual(val, startv1)
00230         self.assertEqual(node, v1)
00231 
00232         myhandler.reset()  # reset future object
00233 
00234         # modify v1 and check we get value
00235         v1.set_value(False)
00236         node, val, data = myhandler.future.result()
00237         self.assertEqual(node, v1)
00238         self.assertEqual(val, False)
00239 
00240         sub.delete() # should delete our monitoreditem too
00241 
00242     def test_subscription_data_change_many(self):
00243         """
00244         test subscriptions. This is far too complicated for
00245         a unittest but, setting up subscriptions requires a lot
00246         of code, so when we first set it up, it is best
00247         to test as many things as possible
00248         """
00249         myhandler = MySubHandler2()
00250         o = self.opc.get_objects_node()
00251 
00252         startv1 = True
00253         v1 = o.add_variable(3, 'SubscriptionVariableMany1', startv1)
00254         startv2 = [1.22, 1.65]
00255         v2 = o.add_variable(3, 'SubscriptionVariableMany2', startv2)
00256 
00257         sub = self.opc.create_subscription(100, myhandler)
00258         handle1, handle2 = sub.subscribe_data_change([v1, v2])
00259 
00260         # Now check we get the start values
00261         nodes = [v1, v2]
00262 
00263         count = 0
00264         while not len(myhandler.results) > 1:
00265             count += 1
00266             time.sleep(0.1)
00267             if count > 100:
00268                 self.fail("Did not get result from subscription")
00269         for node, val in myhandler.results:
00270             self.assertIn(node, nodes)
00271             nodes.remove(node)
00272             if node == v1:
00273                 self.assertEqual(startv1, val)
00274             elif node == v2:
00275                 self.assertEqual(startv2, val)
00276             else:
00277                 self.fail("Error node {0} is neither {1} nor {2}".format(node, v1, v2))
00278 
00279         sub.delete()
00280 
00281     def test_subscribe_server_time(self):
00282         myhandler = MySubHandler()
00283 
00284         server_time_node = self.opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
00285 
00286         sub = self.opc.create_subscription(200, myhandler)
00287         handle = sub.subscribe_data_change(server_time_node)
00288 
00289         node, val, data = myhandler.future.result()
00290         self.assertEqual(node, server_time_node)
00291         delta = datetime.utcnow() - val
00292         self.assertTrue(delta < timedelta(seconds=2))
00293 
00294         sub.unsubscribe(handle)
00295         sub.delete()
00296 
00297 
00298 
00299     def test_create_delete_subscription(self):
00300         o = self.opc.get_objects_node()
00301         v = o.add_variable(3, 'SubscriptionVariable', [1, 2, 3])
00302         sub = self.opc.create_subscription(100, MySubHandler())
00303         handle = sub.subscribe_data_change(v)
00304         time.sleep(0.1)
00305         sub.unsubscribe(handle)
00306         sub.delete()
00307 
00308     def test_subscribe_events(self):
00309         sub = self.opc.create_subscription(100, MySubHandler())
00310         handle = sub.subscribe_events()
00311         time.sleep(0.1)
00312         sub.unsubscribe(handle)
00313         sub.delete()
00314 
00315     def test_subscribe_events_to_wrong_node(self):
00316         sub = self.opc.create_subscription(100, MySubHandler())
00317         with self.assertRaises(ua.UaStatusCodeError):
00318             handle = sub.subscribe_events(self.opc.get_node("i=85"))
00319         o = self.opc.get_objects_node()
00320         v = o.add_variable(3, 'VariableNoEventNofierAttribute', 4)
00321         with self.assertRaises(ua.UaStatusCodeError):
00322             handle = sub.subscribe_events(v)
00323         sub.delete()
00324 
00325     def test_get_event_from_type_node_BaseEvent(self):
00326         etype = self.opc.get_node(ua.ObjectIds.BaseEventType)
00327         properties = opcua.common.events.get_event_properties_from_type_node(etype)
00328         for child in etype.get_properties():
00329             self.assertTrue(child in properties)
00330 
00331     def test_get_event_from_type_node_CustomEvent(self):
00332         etype = self.srv.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.AuditEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
00333 
00334         properties = opcua.common.events.get_event_properties_from_type_node(etype)
00335 
00336         for child in self.opc.get_node(ua.ObjectIds.BaseEventType).get_properties():
00337             self.assertTrue(child in properties)
00338         for child in self.opc.get_node(ua.ObjectIds.AuditEventType).get_properties():
00339             self.assertTrue(child in properties)
00340         for child in self.opc.get_node(etype.nodeid).get_properties():
00341                 self.assertTrue(child in properties)
00342 
00343         self.assertTrue(etype.get_child("2:PropertyNum") in properties)
00344         self.assertTrue(etype.get_child("2:PropertyString") in properties)
00345 
00346     def test_events_default(self):
00347         evgen = self.srv.get_event_generator()
00348 
00349         myhandler = MySubHandler()
00350         sub = self.opc.create_subscription(100, myhandler)
00351         handle = sub.subscribe_events()
00352 
00353         tid = datetime.utcnow()
00354         msg = b"this is my msg "
00355         evgen.trigger(tid, msg)
00356 
00357         ev = myhandler.future.result()
00358         self.assertIsNot(ev, None)  # we did not receive event
00359         self.assertEqual(ev.EventType, ua.NodeId(ua.ObjectIds.BaseEventType))
00360         self.assertEqual(ev.Severity, 1)
00361         self.assertEqual(ev.SourceName, self.opc.get_server_node().get_browse_name().Name)
00362         self.assertEqual(ev.SourceNode, self.opc.get_server_node().nodeid)
00363         self.assertEqual(ev.Message.Text, msg)
00364         self.assertEqual(ev.Time, tid)
00365 
00366         # time.sleep(0.1)
00367         sub.unsubscribe(handle)
00368         sub.delete()
00369 
00370     def test_events_MyObject(self):
00371         objects = self.srv.get_objects_node()
00372         o = objects.add_object(3, 'MyObject')
00373         evgen = self.srv.get_event_generator(source=o)
00374 
00375         myhandler = MySubHandler()
00376         sub = self.opc.create_subscription(100, myhandler)
00377         handle = sub.subscribe_events(o)
00378 
00379         tid = datetime.utcnow()
00380         msg = b"this is my msg "
00381         evgen.trigger(tid, msg)
00382 
00383         ev = myhandler.future.result(10)
00384         self.assertIsNot(ev, None)  # we did not receive event
00385         self.assertEqual(ev.EventType, ua.NodeId(ua.ObjectIds.BaseEventType))
00386         self.assertEqual(ev.Severity, 1)
00387         self.assertEqual(ev.SourceName, 'MyObject')
00388         self.assertEqual(ev.SourceNode, o.nodeid)
00389         self.assertEqual(ev.Message.Text, msg)
00390         self.assertEqual(ev.Time, tid)
00391 
00392         # time.sleep(0.1)
00393         sub.unsubscribe(handle)
00394         sub.delete()
00395 
00396     def test_events_wrong_source(self):
00397         objects = self.srv.get_objects_node()
00398         o = objects.add_object(3, 'MyObject')
00399         evgen = self.srv.get_event_generator(source=o)
00400 
00401         myhandler = MySubHandler()
00402         sub = self.opc.create_subscription(100, myhandler)
00403         handle = sub.subscribe_events()
00404 
00405         tid = datetime.utcnow()
00406         msg = b"this is my msg "
00407         evgen.trigger(tid, msg)
00408 
00409         with self.assertRaises(TimeoutError):  # we should not receive event
00410             ev = myhandler.future.result(2)
00411 
00412         # time.sleep(0.1)
00413         sub.unsubscribe(handle)
00414         sub.delete()
00415 
00416     def test_events_CustomEvent(self):
00417         etype = self.srv.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
00418         evgen = self.srv.get_event_generator(etype)
00419 
00420         myhandler = MySubHandler()
00421         sub = self.opc.create_subscription(100, myhandler)
00422         handle = sub.subscribe_events(evtypes=etype)
00423 
00424         propertynum = 2
00425         propertystring = "This is my test"
00426         evgen.event.PropertyNum = propertynum
00427         evgen.event.PropertyString = propertystring
00428         serverity = 500
00429         evgen.event.Severity = serverity
00430         tid = datetime.utcnow()
00431         msg = b"this is my msg "
00432         evgen.trigger(tid, msg)
00433 
00434         ev = myhandler.future.result(10)
00435         self.assertIsNot(ev, None)  # we did not receive event
00436         self.assertEqual(ev.EventType, etype.nodeid)
00437         self.assertEqual(ev.Severity, serverity)
00438         self.assertEqual(ev.SourceName, self.opc.get_server_node().get_browse_name().Name)
00439         self.assertEqual(ev.SourceNode, self.opc.get_server_node().nodeid)
00440         self.assertEqual(ev.Message.Text, msg)
00441         self.assertEqual(ev.Time, tid)
00442         self.assertEqual(ev.PropertyNum, propertynum)
00443         self.assertEqual(ev.PropertyString, propertystring)
00444 
00445         # time.sleep(0.1)
00446         sub.unsubscribe(handle)
00447         sub.delete()
00448 
00449     def test_events_CustomEvent_MyObject(self):
00450         objects = self.srv.get_objects_node()
00451         o = objects.add_object(3, 'MyObject')
00452         etype = self.srv.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
00453         evgen = self.srv.get_event_generator(etype, o)
00454 
00455         myhandler = MySubHandler()
00456         sub = self.opc.create_subscription(100, myhandler)
00457         handle = sub.subscribe_events(o, etype)
00458 
00459         propertynum = 2
00460         propertystring = "This is my test"
00461         evgen.event.PropertyNum = propertynum
00462         evgen.event.PropertyString = propertystring
00463         tid = datetime.utcnow()
00464         msg = b"this is my msg "
00465         evgen.trigger(tid, msg)
00466 
00467         ev = myhandler.future.result(10)
00468         self.assertIsNot(ev, None)  # we did not receive event
00469         self.assertEqual(ev.EventType, etype.nodeid)
00470         self.assertEqual(ev.Severity, 1)
00471         self.assertEqual(ev.SourceName, 'MyObject')
00472         self.assertEqual(ev.SourceNode, o.nodeid)
00473         self.assertEqual(ev.Message.Text, msg)
00474         self.assertEqual(ev.Time, tid)
00475         self.assertEqual(ev.PropertyNum, propertynum)
00476         self.assertEqual(ev.PropertyString, propertystring)
00477 
00478         # time.sleep(0.1)
00479         sub.unsubscribe(handle)
00480         sub.delete()
00481 
00482     def test_several_different_events(self):
00483         objects = self.srv.get_objects_node()
00484         o = objects.add_object(3, 'MyObject')
00485 
00486         etype1 = self.srv.create_custom_event_type(2, 'MyEvent1', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
00487         evgen1 = self.srv.get_event_generator(etype1, o)
00488 
00489         etype2 = self.srv.create_custom_event_type(2, 'MyEvent2', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
00490         evgen2 = self.srv.get_event_generator(etype2, o)
00491 
00492         myhandler = MySubHandler2()
00493         sub = self.opc.create_subscription(100, myhandler)
00494         handle = sub.subscribe_events(o, etype1)
00495 
00496         propertynum1 = 1
00497         propertystring1 = "This is my test 1"
00498         evgen1.event.PropertyNum = propertynum1
00499         evgen1.event.PropertyString = propertystring1
00500 
00501         propertynum2 = 2
00502         propertystring2 = "This is my test 2"
00503         evgen2.event.PropertyNum = propertynum2
00504         evgen2.event.PropertyString = propertystring2
00505         
00506         for i in range(3):
00507             evgen1.trigger()
00508             evgen2.trigger()
00509         time.sleep(1)
00510 
00511         self.assertEqual(len(myhandler.results), 3)
00512         ev = myhandler.results[-1]
00513         self.assertEqual(ev.EventType, etype1.nodeid)
00514 
00515         handle = sub.subscribe_events(o, etype2)
00516         for i in range(4):
00517             evgen1.trigger()
00518             evgen2.trigger()
00519         time.sleep(1)
00520 
00521 
00522         ev1s = [ev for ev in myhandler.results if ev.EventType == etype1.nodeid]
00523         ev2s = [ev for ev in myhandler.results if ev.EventType == etype2.nodeid]
00524 
00525         self.assertEqual(len(myhandler.results), 11)
00526         self.assertEqual(len(ev2s), 4)
00527         self.assertEqual(len(ev1s), 7)
00528 
00529         sub.unsubscribe(handle)
00530         sub.delete()
00531 
00532     def test_several_different_events_2(self):
00533         objects = self.srv.get_objects_node()
00534         o = objects.add_object(3, 'MyObject')
00535 
00536         etype1 = self.srv.create_custom_event_type(2, 'MyEvent1', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
00537         evgen1 = self.srv.get_event_generator(etype1, o)
00538 
00539         etype2 = self.srv.create_custom_event_type(2, 'MyEvent2', ua.ObjectIds.BaseEventType, [('PropertyNum2', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
00540         evgen2 = self.srv.get_event_generator(etype2, o)
00541 
00542         etype3 = self.srv.create_custom_event_type(2, 'MyEvent3', ua.ObjectIds.BaseEventType, [('PropertyNum3', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
00543         evgen3 = self.srv.get_event_generator(etype3, o)
00544 
00545         myhandler = MySubHandler2()
00546         sub = self.opc.create_subscription(100, myhandler)
00547         handle = sub.subscribe_events(o, [etype1, etype3])
00548 
00549         propertynum1 = 1
00550         propertystring1 = "This is my test 1"
00551         evgen1.event.PropertyNum = propertynum1
00552         evgen1.event.PropertyString = propertystring1
00553 
00554         propertynum2 = 2
00555         propertystring2 = "This is my test 2"
00556         evgen2.event.PropertyNum2 = propertynum2
00557         evgen2.event.PropertyString = propertystring2
00558 
00559         propertynum3 = 3
00560         propertystring3 = "This is my test 3"
00561         evgen3.event.PropertyNum3 = propertynum3
00562         evgen3.event.PropertyString = propertystring2
00563         
00564         for i in range(3):
00565             evgen1.trigger()
00566             evgen2.trigger()
00567             evgen3.trigger()
00568         evgen3.event.PropertyNum3 = 9999
00569         evgen3.trigger()
00570         time.sleep(1)
00571 
00572         ev1s = [ev for ev in myhandler.results if ev.EventType == etype1.nodeid]
00573         ev2s = [ev for ev in myhandler.results if ev.EventType == etype2.nodeid]
00574         ev3s = [ev for ev in myhandler.results if ev.EventType == etype3.nodeid]
00575 
00576         self.assertEqual(len(myhandler.results), 7)
00577         self.assertEqual(len(ev1s), 3)
00578         self.assertEqual(len(ev2s), 0)
00579         self.assertEqual(len(ev3s), 4)
00580         self.assertEqual(ev1s[0].PropertyNum, propertynum1)
00581         self.assertEqual(ev3s[0].PropertyNum3, propertynum3)
00582         self.assertEqual(ev3s[-1].PropertyNum3, 9999)
00583         self.assertEqual(ev1s[0].PropertyNum3, None)
00584 
00585         sub.unsubscribe(handle)
00586         sub.delete()
00587 


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:23