2 from concurrent.futures
import Future, TimeoutError
4 from datetime
import datetime, timedelta
13 Dummy subscription client 26 More advanced subscription client using Future, so we can wait for events in tests 36 self.future.set_result((node, val, data))
39 self.future.set_result(event)
47 self.results.append((node, val))
50 self.results.append(event)
69 o = self.opc.get_objects_node()
70 sub = self.opc.create_subscription(100, myhandler)
71 with self.assertRaises(ua.UaStatusCodeError):
72 handle1 = sub.subscribe_data_change(o)
78 o = self.opc.get_objects_node()
79 sub = self.opc.create_subscription(1, myhandler)
83 v = o.add_variable(3,
'SubscriptionVariableOverload' + str(i), 99)
86 sub.subscribe_data_change(variables)
89 variables[i].set_value(j)
90 s = self.opc.create_subscription(1, myhandler)
91 s.subscribe_data_change(variables)
93 sub.subscribe_data_change(variables[i])
96 variables[i].set_value(j)
103 sub = self.opc.create_subscription(1, myhandler)
104 o = self.opc.get_objects_node()
105 var = o.add_variable(3,
'SubVarCounter', 0.1)
106 sub.subscribe_data_change(var)
109 val = var.get_value()
110 var.set_value(val +1)
112 self.assertEqual(myhandler.datachange_count, nb + 1)
117 sub = self.opc.create_subscription(1, myhandler)
118 o = self.opc.get_objects_node()
119 var = o.add_variable(3,
'SubVarCounter', [0.1, 0.2])
120 sub.subscribe_data_change(var)
123 val = var.get_value()
127 self.assertEqual(myhandler.datachange_count, nb + 1)
132 sub = self.opc.create_subscription(1, myhandler)
133 o = self.opc.get_objects_node()
134 var = o.add_variable(3,
'SubVarCounter', [0.1, 0.2])
135 sub.subscribe_data_change(var)
138 val = var.get_value()
141 self.assertEqual(myhandler.datachange_count, 1)
146 sub = self.opc.create_subscription(1, myhandler)
147 o = self.opc.get_objects_node()
148 var = o.add_variable(3,
'SubVarCounter', [0.1, 0.2, 0.3])
149 sub.subscribe_data_change(var)
151 val = var.get_value()
153 var.set_value(val, ua.VariantType.Double)
157 self.assertEqual(myhandler.datachange_count, 4)
163 o = self.opc.get_objects_node()
164 sub = self.opc.create_subscription(1, myhandler)
165 variables = [o.add_variable(3,
'SubVarOverload' + str(i), i)
for i
in range(nb)]
167 sub.subscribe_data_change(variables)
172 test subscriptions. This is far too complicated for 173 a unittest but, setting up subscriptions requires a lot 174 of code, so when we first set it up, it is best 175 to test as many things as possible 179 o = self.opc.get_objects_node()
183 v1 = o.add_variable(3,
'SubscriptionVariableV1', startv1)
184 sub = self.opc.create_subscription(100, myhandler)
185 handle1 = sub.subscribe_data_change(v1)
188 node, val, data = myhandler.future.result()
189 self.assertEqual(val, startv1)
190 self.assertEqual(node, v1)
196 node, val, data = myhandler.future.result()
198 self.assertEqual(node, v1)
199 self.assertEqual(val, [5])
201 with self.assertRaises(ua.UaStatusCodeError):
203 sub.unsubscribe(handle1)
204 with self.assertRaises(ua.UaStatusCodeError):
205 sub.unsubscribe(handle1)
207 with self.assertRaises(ua.UaStatusCodeError):
208 sub.unsubscribe(handle1)
212 test subscriptions. This is far too complicated for 213 a unittest but, setting up subscriptions requires a lot 214 of code, so when we first set it up, it is best 215 to test as many things as possible 219 o = self.opc.get_objects_node()
223 v1 = o.add_variable(3,
'SubscriptionVariableBool', startv1)
224 sub = self.opc.create_subscription(100, myhandler)
225 handle1 = sub.subscribe_data_change(v1)
228 node, val, data = myhandler.future.result()
229 self.assertEqual(val, startv1)
230 self.assertEqual(node, v1)
236 node, val, data = myhandler.future.result()
237 self.assertEqual(node, v1)
238 self.assertEqual(val,
False)
244 test subscriptions. This is far too complicated for 245 a unittest but, setting up subscriptions requires a lot 246 of code, so when we first set it up, it is best 247 to test as many things as possible 250 o = self.opc.get_objects_node()
253 v1 = o.add_variable(3,
'SubscriptionVariableMany1', startv1)
254 startv2 = [1.22, 1.65]
255 v2 = o.add_variable(3,
'SubscriptionVariableMany2', startv2)
257 sub = self.opc.create_subscription(100, myhandler)
258 handle1, handle2 = sub.subscribe_data_change([v1, v2])
264 while not len(myhandler.results) > 1:
268 self.fail(
"Did not get result from subscription")
269 for node, val
in myhandler.results:
270 self.assertIn(node, nodes)
273 self.assertEqual(startv1, val)
275 self.assertEqual(startv2, val)
277 self.fail(
"Error node {0} is neither {1} nor {2}".format(node, v1, v2))
284 server_time_node = self.opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
286 sub = self.opc.create_subscription(200, myhandler)
287 handle = sub.subscribe_data_change(server_time_node)
289 node, val, data = myhandler.future.result()
290 self.assertEqual(node, server_time_node)
291 delta = datetime.utcnow() - val
292 self.assertTrue(delta < timedelta(seconds=2))
294 sub.unsubscribe(handle)
300 o = self.opc.get_objects_node()
301 v = o.add_variable(3,
'SubscriptionVariable', [1, 2, 3])
303 handle = sub.subscribe_data_change(v)
305 sub.unsubscribe(handle)
310 handle = sub.subscribe_events()
312 sub.unsubscribe(handle)
317 with self.assertRaises(ua.UaStatusCodeError):
318 handle = sub.subscribe_events(self.opc.get_node(
"i=85"))
319 o = self.opc.get_objects_node()
320 v = o.add_variable(3,
'VariableNoEventNofierAttribute', 4)
321 with self.assertRaises(ua.UaStatusCodeError):
322 handle = sub.subscribe_events(v)
326 etype = self.opc.get_node(ua.ObjectIds.BaseEventType)
328 for child
in etype.get_properties():
329 self.assertTrue(child
in properties)
332 etype = self.srv.create_custom_event_type(2,
'MyEvent', ua.ObjectIds.AuditEventType, [(
'PropertyNum', ua.VariantType.Float), (
'PropertyString', ua.VariantType.String)])
336 for child
in self.opc.get_node(ua.ObjectIds.BaseEventType).get_properties():
337 self.assertTrue(child
in properties)
338 for child
in self.opc.get_node(ua.ObjectIds.AuditEventType).get_properties():
339 self.assertTrue(child
in properties)
340 for child
in self.opc.get_node(etype.nodeid).get_properties():
341 self.assertTrue(child
in properties)
343 self.assertTrue(etype.get_child(
"2:PropertyNum")
in properties)
344 self.assertTrue(etype.get_child(
"2:PropertyString")
in properties)
347 evgen = self.srv.get_event_generator()
350 sub = self.opc.create_subscription(100, myhandler)
351 handle = sub.subscribe_events()
353 tid = datetime.utcnow()
354 msg = b
"this is my msg " 355 evgen.trigger(tid, msg)
357 ev = myhandler.future.result()
358 self.assertIsNot(ev,
None)
359 self.assertEqual(ev.EventType, ua.NodeId(ua.ObjectIds.BaseEventType))
360 self.assertEqual(ev.Severity, 1)
361 self.assertEqual(ev.SourceName, self.opc.get_server_node().get_browse_name().Name)
362 self.assertEqual(ev.SourceNode, self.opc.get_server_node().nodeid)
363 self.assertEqual(ev.Message.Text, msg)
364 self.assertEqual(ev.Time, tid)
367 sub.unsubscribe(handle)
371 objects = self.srv.get_objects_node()
372 o = objects.add_object(3,
'MyObject')
373 evgen = self.srv.get_event_generator(source=o)
376 sub = self.opc.create_subscription(100, myhandler)
377 handle = sub.subscribe_events(o)
379 tid = datetime.utcnow()
380 msg = b
"this is my msg " 381 evgen.trigger(tid, msg)
383 ev = myhandler.future.result(10)
384 self.assertIsNot(ev,
None)
385 self.assertEqual(ev.EventType, ua.NodeId(ua.ObjectIds.BaseEventType))
386 self.assertEqual(ev.Severity, 1)
387 self.assertEqual(ev.SourceName,
'MyObject')
388 self.assertEqual(ev.SourceNode, o.nodeid)
389 self.assertEqual(ev.Message.Text, msg)
390 self.assertEqual(ev.Time, tid)
393 sub.unsubscribe(handle)
397 objects = self.srv.get_objects_node()
398 o = objects.add_object(3,
'MyObject')
399 evgen = self.srv.get_event_generator(source=o)
402 sub = self.opc.create_subscription(100, myhandler)
403 handle = sub.subscribe_events()
405 tid = datetime.utcnow()
406 msg = b
"this is my msg " 407 evgen.trigger(tid, msg)
409 with self.assertRaises(TimeoutError):
410 ev = myhandler.future.result(2)
413 sub.unsubscribe(handle)
417 etype = self.srv.create_custom_event_type(2,
'MyEvent', ua.ObjectIds.BaseEventType, [(
'PropertyNum', ua.VariantType.Float), (
'PropertyString', ua.VariantType.String)])
418 evgen = self.srv.get_event_generator(etype)
421 sub = self.opc.create_subscription(100, myhandler)
422 handle = sub.subscribe_events(evtypes=etype)
425 propertystring =
"This is my test" 426 evgen.event.PropertyNum = propertynum
427 evgen.event.PropertyString = propertystring
429 evgen.event.Severity = serverity
430 tid = datetime.utcnow()
431 msg = b
"this is my msg " 432 evgen.trigger(tid, msg)
434 ev = myhandler.future.result(10)
435 self.assertIsNot(ev,
None)
436 self.assertEqual(ev.EventType, etype.nodeid)
437 self.assertEqual(ev.Severity, serverity)
438 self.assertEqual(ev.SourceName, self.opc.get_server_node().get_browse_name().Name)
439 self.assertEqual(ev.SourceNode, self.opc.get_server_node().nodeid)
440 self.assertEqual(ev.Message.Text, msg)
441 self.assertEqual(ev.Time, tid)
442 self.assertEqual(ev.PropertyNum, propertynum)
443 self.assertEqual(ev.PropertyString, propertystring)
446 sub.unsubscribe(handle)
450 objects = self.srv.get_objects_node()
451 o = objects.add_object(3,
'MyObject')
452 etype = self.srv.create_custom_event_type(2,
'MyEvent', ua.ObjectIds.BaseEventType, [(
'PropertyNum', ua.VariantType.Float), (
'PropertyString', ua.VariantType.String)])
453 evgen = self.srv.get_event_generator(etype, o)
456 sub = self.opc.create_subscription(100, myhandler)
457 handle = sub.subscribe_events(o, etype)
460 propertystring =
"This is my test" 461 evgen.event.PropertyNum = propertynum
462 evgen.event.PropertyString = propertystring
463 tid = datetime.utcnow()
464 msg = b
"this is my msg " 465 evgen.trigger(tid, msg)
467 ev = myhandler.future.result(10)
468 self.assertIsNot(ev,
None)
469 self.assertEqual(ev.EventType, etype.nodeid)
470 self.assertEqual(ev.Severity, 1)
471 self.assertEqual(ev.SourceName,
'MyObject')
472 self.assertEqual(ev.SourceNode, o.nodeid)
473 self.assertEqual(ev.Message.Text, msg)
474 self.assertEqual(ev.Time, tid)
475 self.assertEqual(ev.PropertyNum, propertynum)
476 self.assertEqual(ev.PropertyString, propertystring)
479 sub.unsubscribe(handle)
483 objects = self.srv.get_objects_node()
484 o = objects.add_object(3,
'MyObject')
486 etype1 = self.srv.create_custom_event_type(2,
'MyEvent1', ua.ObjectIds.BaseEventType, [(
'PropertyNum', ua.VariantType.Float), (
'PropertyString', ua.VariantType.String)])
487 evgen1 = self.srv.get_event_generator(etype1, o)
489 etype2 = self.srv.create_custom_event_type(2,
'MyEvent2', ua.ObjectIds.BaseEventType, [(
'PropertyNum', ua.VariantType.Float), (
'PropertyString', ua.VariantType.String)])
490 evgen2 = self.srv.get_event_generator(etype2, o)
493 sub = self.opc.create_subscription(100, myhandler)
494 handle = sub.subscribe_events(o, etype1)
497 propertystring1 =
"This is my test 1" 498 evgen1.event.PropertyNum = propertynum1
499 evgen1.event.PropertyString = propertystring1
502 propertystring2 =
"This is my test 2" 503 evgen2.event.PropertyNum = propertynum2
504 evgen2.event.PropertyString = propertystring2
511 self.assertEqual(len(myhandler.results), 3)
512 ev = myhandler.results[-1]
513 self.assertEqual(ev.EventType, etype1.nodeid)
515 handle = sub.subscribe_events(o, etype2)
522 ev1s = [ev
for ev
in myhandler.results
if ev.EventType == etype1.nodeid]
523 ev2s = [ev
for ev
in myhandler.results
if ev.EventType == etype2.nodeid]
525 self.assertEqual(len(myhandler.results), 11)
526 self.assertEqual(len(ev2s), 4)
527 self.assertEqual(len(ev1s), 7)
529 sub.unsubscribe(handle)
533 objects = self.srv.get_objects_node()
534 o = objects.add_object(3,
'MyObject')
536 etype1 = self.srv.create_custom_event_type(2,
'MyEvent1', ua.ObjectIds.BaseEventType, [(
'PropertyNum', ua.VariantType.Float), (
'PropertyString', ua.VariantType.String)])
537 evgen1 = self.srv.get_event_generator(etype1, o)
539 etype2 = self.srv.create_custom_event_type(2,
'MyEvent2', ua.ObjectIds.BaseEventType, [(
'PropertyNum2', ua.VariantType.Float), (
'PropertyString', ua.VariantType.String)])
540 evgen2 = self.srv.get_event_generator(etype2, o)
542 etype3 = self.srv.create_custom_event_type(2,
'MyEvent3', ua.ObjectIds.BaseEventType, [(
'PropertyNum3', ua.VariantType.Float), (
'PropertyString', ua.VariantType.String)])
543 evgen3 = self.srv.get_event_generator(etype3, o)
546 sub = self.opc.create_subscription(100, myhandler)
547 handle = sub.subscribe_events(o, [etype1, etype3])
550 propertystring1 =
"This is my test 1" 551 evgen1.event.PropertyNum = propertynum1
552 evgen1.event.PropertyString = propertystring1
555 propertystring2 =
"This is my test 2" 556 evgen2.event.PropertyNum2 = propertynum2
557 evgen2.event.PropertyString = propertystring2
560 propertystring3 =
"This is my test 3" 561 evgen3.event.PropertyNum3 = propertynum3
562 evgen3.event.PropertyString = propertystring2
568 evgen3.event.PropertyNum3 = 9999
572 ev1s = [ev
for ev
in myhandler.results
if ev.EventType == etype1.nodeid]
573 ev2s = [ev
for ev
in myhandler.results
if ev.EventType == etype2.nodeid]
574 ev3s = [ev
for ev
in myhandler.results
if ev.EventType == etype3.nodeid]
576 self.assertEqual(len(myhandler.results), 7)
577 self.assertEqual(len(ev1s), 3)
578 self.assertEqual(len(ev2s), 0)
579 self.assertEqual(len(ev3s), 4)
580 self.assertEqual(ev1s[0].PropertyNum, propertynum1)
581 self.assertEqual(ev3s[0].PropertyNum3, propertynum3)
582 self.assertEqual(ev3s[-1].PropertyNum3, 9999)
583 self.assertEqual(ev1s[0].PropertyNum3,
None)
585 sub.unsubscribe(handle)
def test_events_default(self)
def test_subscription_data_change(self)
def event_notification(self, event)
def test_subscription_data_change_many(self)
def test_several_different_events(self)
def test_get_event_from_type_node_CustomEvent(self)
def test_subscription_count_empty(self)
def test_events_CustomEvent_MyObject(self)
def datachange_notification(self, node, val, data)
def test_events_wrong_source(self)
def test_subscribe_events(self)
def test_subscription_overload(self)
def test_subscribe_events_to_wrong_node(self)
def test_subscription_count(self)
def test_subscription_data_change_bool(self)
def test_subscription_count_list(self)
def test_create_delete_subscription(self)
def test_events_CustomEvent(self)
def datachange_notification(self, node, val, data)
def test_several_different_events_2(self)
def test_events_MyObject(self)
def test_subscription_failure(self)
def get_event_properties_from_type_node(node)
def test_subscription_count_no_change(self)
def test_subscription_overload_simple(self)
def event_notification(self, event)
def datachange_notification(self, node, val, data)
def event_notification(self, event)
def event_notification(self, event)
def datachange_notification(self, node, val, data)
def test_subscribe_server_time(self)
def test_get_event_from_type_node_BaseEvent(self)