tests_subscriptions.py
Go to the documentation of this file.
1 
2 from concurrent.futures import Future, TimeoutError
3 import time
4 from datetime import datetime, timedelta
5 
6 import opcua
7 from opcua import ua
8 
9 
10 class SubHandler():
11 
12  """
13  Dummy subscription client
14  """
15 
16  def datachange_notification(self, node, val, data):
17  pass
18 
19  def event_notification(self, event):
20  pass
21 
22 
23 class MySubHandler():
24 
25  """
26  More advanced subscription client using Future, so we can wait for events in tests
27  """
28 
29  def __init__(self):
30  self.future = Future()
31 
32  def reset(self):
33  self.future = Future()
34 
35  def datachange_notification(self, node, val, data):
36  self.future.set_result((node, val, data))
37 
38  def event_notification(self, event):
39  self.future.set_result(event)
40 
41 
42 class MySubHandler2():
43  def __init__(self):
44  self.results = []
45 
46  def datachange_notification(self, node, val, data):
47  self.results.append((node, val))
48 
49  def event_notification(self, event):
50  self.results.append(event)
51 
52 
54  def __init__(self):
56  self.event_count = 0
57 
58  def datachange_notification(self, node, val, data):
59  self.datachange_count += 1
60 
61  def event_notification(self, event):
62  self.event_count += 1
63 
64 
65 class SubscriptionTests(object):
66 
68  myhandler = MySubHandler()
69  o = self.opc.get_objects_node()
70  sub = self.opc.create_subscription(100, myhandler)
71  with self.assertRaises(ua.UaStatusCodeError):
72  handle1 = sub.subscribe_data_change(o) # we can only subscribe to variables so this should fail
73  sub.delete()
74 
76  nb = 10
77  myhandler = MySubHandler()
78  o = self.opc.get_objects_node()
79  sub = self.opc.create_subscription(1, myhandler)
80  variables = []
81  subs = []
82  for i in range(nb):
83  v = o.add_variable(3, 'SubscriptionVariableOverload' + str(i), 99)
84  variables.append(v)
85  for i in range(nb):
86  sub.subscribe_data_change(variables)
87  for i in range(nb):
88  for j in range(nb):
89  variables[i].set_value(j)
90  s = self.opc.create_subscription(1, myhandler)
91  s.subscribe_data_change(variables)
92  subs.append(s)
93  sub.subscribe_data_change(variables[i])
94  for i in range(nb):
95  for j in range(nb):
96  variables[i].set_value(j)
97  sub.delete()
98  for s in subs:
99  s.delete()
100 
102  myhandler = MySubHandlerCounter()
103  sub = self.opc.create_subscription(1, myhandler)
104  o = self.opc.get_objects_node()
105  var = o.add_variable(3, 'SubVarCounter', 0.1)
106  sub.subscribe_data_change(var)
107  nb = 12
108  for i in range(nb):
109  val = var.get_value()
110  var.set_value(val +1)
111  time.sleep(0.2) # let last event arrive
112  self.assertEqual(myhandler.datachange_count, nb + 1)
113  sub.delete()
114 
116  myhandler = MySubHandlerCounter()
117  sub = self.opc.create_subscription(1, myhandler)
118  o = self.opc.get_objects_node()
119  var = o.add_variable(3, 'SubVarCounter', [0.1, 0.2])
120  sub.subscribe_data_change(var)
121  nb = 12
122  for i in range(nb):
123  val = var.get_value()
124  val.append(i)
125  var.set_value(val)
126  time.sleep(0.2) # let last event arrive
127  self.assertEqual(myhandler.datachange_count, nb + 1)
128  sub.delete()
129 
131  myhandler = MySubHandlerCounter()
132  sub = self.opc.create_subscription(1, myhandler)
133  o = self.opc.get_objects_node()
134  var = o.add_variable(3, 'SubVarCounter', [0.1, 0.2])
135  sub.subscribe_data_change(var)
136  nb = 12
137  for i in range(nb):
138  val = var.get_value()
139  var.set_value(val)
140  time.sleep(0.2) # let last event arrive
141  self.assertEqual(myhandler.datachange_count, 1)
142  sub.delete()
143 
145  myhandler = MySubHandlerCounter()
146  sub = self.opc.create_subscription(1, myhandler)
147  o = self.opc.get_objects_node()
148  var = o.add_variable(3, 'SubVarCounter', [0.1, 0.2, 0.3])
149  sub.subscribe_data_change(var)
150  while True:
151  val = var.get_value()
152  val.pop()
153  var.set_value(val, ua.VariantType.Double)
154  if not val:
155  break
156  time.sleep(0.2) # let last event arrive
157  self.assertEqual(myhandler.datachange_count, 4)
158  sub.delete()
159 
161  nb = 10
162  myhandler = MySubHandler()
163  o = self.opc.get_objects_node()
164  sub = self.opc.create_subscription(1, myhandler)
165  variables = [o.add_variable(3, 'SubVarOverload' + str(i), i) for i in range(nb)]
166  for i in range(nb):
167  sub.subscribe_data_change(variables)
168  sub.delete()
169 
171  """
172  test subscriptions. This is far too complicated for
173  a unittest but, setting up subscriptions requires a lot
174  of code, so when we first set it up, it is best
175  to test as many things as possible
176  """
177  myhandler = MySubHandler()
178 
179  o = self.opc.get_objects_node()
180 
181  # subscribe to a variable
182  startv1 = [1, 2, 3]
183  v1 = o.add_variable(3, 'SubscriptionVariableV1', startv1)
184  sub = self.opc.create_subscription(100, myhandler)
185  handle1 = sub.subscribe_data_change(v1)
186 
187  # Now check we get the start value
188  node, val, data = myhandler.future.result()
189  self.assertEqual(val, startv1)
190  self.assertEqual(node, v1)
191 
192  myhandler.reset() # reset future object
193 
194  # modify v1 and check we get value
195  v1.set_value([5])
196  node, val, data = myhandler.future.result()
197 
198  self.assertEqual(node, v1)
199  self.assertEqual(val, [5])
200 
201  with self.assertRaises(ua.UaStatusCodeError):
202  sub.unsubscribe(999) # non existing handle
203  sub.unsubscribe(handle1)
204  with self.assertRaises(ua.UaStatusCodeError):
205  sub.unsubscribe(handle1) # second try should fail
206  sub.delete()
207  with self.assertRaises(ua.UaStatusCodeError):
208  sub.unsubscribe(handle1) # sub does not exist anymore
209 
211  """
212  test subscriptions. This is far too complicated for
213  a unittest but, setting up subscriptions requires a lot
214  of code, so when we first set it up, it is best
215  to test as many things as possible
216  """
217  myhandler = MySubHandler()
218 
219  o = self.opc.get_objects_node()
220 
221  # subscribe to a variable
222  startv1 = True
223  v1 = o.add_variable(3, 'SubscriptionVariableBool', startv1)
224  sub = self.opc.create_subscription(100, myhandler)
225  handle1 = sub.subscribe_data_change(v1)
226 
227  # Now check we get the start value
228  node, val, data = myhandler.future.result()
229  self.assertEqual(val, startv1)
230  self.assertEqual(node, v1)
231 
232  myhandler.reset() # reset future object
233 
234  # modify v1 and check we get value
235  v1.set_value(False)
236  node, val, data = myhandler.future.result()
237  self.assertEqual(node, v1)
238  self.assertEqual(val, False)
239 
240  sub.delete() # should delete our monitoreditem too
241 
243  """
244  test subscriptions. This is far too complicated for
245  a unittest but, setting up subscriptions requires a lot
246  of code, so when we first set it up, it is best
247  to test as many things as possible
248  """
249  myhandler = MySubHandler2()
250  o = self.opc.get_objects_node()
251 
252  startv1 = True
253  v1 = o.add_variable(3, 'SubscriptionVariableMany1', startv1)
254  startv2 = [1.22, 1.65]
255  v2 = o.add_variable(3, 'SubscriptionVariableMany2', startv2)
256 
257  sub = self.opc.create_subscription(100, myhandler)
258  handle1, handle2 = sub.subscribe_data_change([v1, v2])
259 
260  # Now check we get the start values
261  nodes = [v1, v2]
262 
263  count = 0
264  while not len(myhandler.results) > 1:
265  count += 1
266  time.sleep(0.1)
267  if count > 100:
268  self.fail("Did not get result from subscription")
269  for node, val in myhandler.results:
270  self.assertIn(node, nodes)
271  nodes.remove(node)
272  if node == v1:
273  self.assertEqual(startv1, val)
274  elif node == v2:
275  self.assertEqual(startv2, val)
276  else:
277  self.fail("Error node {0} is neither {1} nor {2}".format(node, v1, v2))
278 
279  sub.delete()
280 
282  myhandler = MySubHandler()
283 
284  server_time_node = self.opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
285 
286  sub = self.opc.create_subscription(200, myhandler)
287  handle = sub.subscribe_data_change(server_time_node)
288 
289  node, val, data = myhandler.future.result()
290  self.assertEqual(node, server_time_node)
291  delta = datetime.utcnow() - val
292  self.assertTrue(delta < timedelta(seconds=2))
293 
294  sub.unsubscribe(handle)
295  sub.delete()
296 
297 
298 
300  o = self.opc.get_objects_node()
301  v = o.add_variable(3, 'SubscriptionVariable', [1, 2, 3])
302  sub = self.opc.create_subscription(100, MySubHandler())
303  handle = sub.subscribe_data_change(v)
304  time.sleep(0.1)
305  sub.unsubscribe(handle)
306  sub.delete()
307 
309  sub = self.opc.create_subscription(100, MySubHandler())
310  handle = sub.subscribe_events()
311  time.sleep(0.1)
312  sub.unsubscribe(handle)
313  sub.delete()
314 
316  sub = self.opc.create_subscription(100, MySubHandler())
317  with self.assertRaises(ua.UaStatusCodeError):
318  handle = sub.subscribe_events(self.opc.get_node("i=85"))
319  o = self.opc.get_objects_node()
320  v = o.add_variable(3, 'VariableNoEventNofierAttribute', 4)
321  with self.assertRaises(ua.UaStatusCodeError):
322  handle = sub.subscribe_events(v)
323  sub.delete()
324 
326  etype = self.opc.get_node(ua.ObjectIds.BaseEventType)
328  for child in etype.get_properties():
329  self.assertTrue(child in properties)
330 
332  etype = self.srv.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.AuditEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
333 
335 
336  for child in self.opc.get_node(ua.ObjectIds.BaseEventType).get_properties():
337  self.assertTrue(child in properties)
338  for child in self.opc.get_node(ua.ObjectIds.AuditEventType).get_properties():
339  self.assertTrue(child in properties)
340  for child in self.opc.get_node(etype.nodeid).get_properties():
341  self.assertTrue(child in properties)
342 
343  self.assertTrue(etype.get_child("2:PropertyNum") in properties)
344  self.assertTrue(etype.get_child("2:PropertyString") in properties)
345 
347  evgen = self.srv.get_event_generator()
348 
349  myhandler = MySubHandler()
350  sub = self.opc.create_subscription(100, myhandler)
351  handle = sub.subscribe_events()
352 
353  tid = datetime.utcnow()
354  msg = b"this is my msg "
355  evgen.trigger(tid, msg)
356 
357  ev = myhandler.future.result()
358  self.assertIsNot(ev, None) # we did not receive event
359  self.assertEqual(ev.EventType, ua.NodeId(ua.ObjectIds.BaseEventType))
360  self.assertEqual(ev.Severity, 1)
361  self.assertEqual(ev.SourceName, self.opc.get_server_node().get_browse_name().Name)
362  self.assertEqual(ev.SourceNode, self.opc.get_server_node().nodeid)
363  self.assertEqual(ev.Message.Text, msg)
364  self.assertEqual(ev.Time, tid)
365 
366  # time.sleep(0.1)
367  sub.unsubscribe(handle)
368  sub.delete()
369 
371  objects = self.srv.get_objects_node()
372  o = objects.add_object(3, 'MyObject')
373  evgen = self.srv.get_event_generator(source=o)
374 
375  myhandler = MySubHandler()
376  sub = self.opc.create_subscription(100, myhandler)
377  handle = sub.subscribe_events(o)
378 
379  tid = datetime.utcnow()
380  msg = b"this is my msg "
381  evgen.trigger(tid, msg)
382 
383  ev = myhandler.future.result(10)
384  self.assertIsNot(ev, None) # we did not receive event
385  self.assertEqual(ev.EventType, ua.NodeId(ua.ObjectIds.BaseEventType))
386  self.assertEqual(ev.Severity, 1)
387  self.assertEqual(ev.SourceName, 'MyObject')
388  self.assertEqual(ev.SourceNode, o.nodeid)
389  self.assertEqual(ev.Message.Text, msg)
390  self.assertEqual(ev.Time, tid)
391 
392  # time.sleep(0.1)
393  sub.unsubscribe(handle)
394  sub.delete()
395 
397  objects = self.srv.get_objects_node()
398  o = objects.add_object(3, 'MyObject')
399  evgen = self.srv.get_event_generator(source=o)
400 
401  myhandler = MySubHandler()
402  sub = self.opc.create_subscription(100, myhandler)
403  handle = sub.subscribe_events()
404 
405  tid = datetime.utcnow()
406  msg = b"this is my msg "
407  evgen.trigger(tid, msg)
408 
409  with self.assertRaises(TimeoutError): # we should not receive event
410  ev = myhandler.future.result(2)
411 
412  # time.sleep(0.1)
413  sub.unsubscribe(handle)
414  sub.delete()
415 
417  etype = self.srv.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
418  evgen = self.srv.get_event_generator(etype)
419 
420  myhandler = MySubHandler()
421  sub = self.opc.create_subscription(100, myhandler)
422  handle = sub.subscribe_events(evtypes=etype)
423 
424  propertynum = 2
425  propertystring = "This is my test"
426  evgen.event.PropertyNum = propertynum
427  evgen.event.PropertyString = propertystring
428  serverity = 500
429  evgen.event.Severity = serverity
430  tid = datetime.utcnow()
431  msg = b"this is my msg "
432  evgen.trigger(tid, msg)
433 
434  ev = myhandler.future.result(10)
435  self.assertIsNot(ev, None) # we did not receive event
436  self.assertEqual(ev.EventType, etype.nodeid)
437  self.assertEqual(ev.Severity, serverity)
438  self.assertEqual(ev.SourceName, self.opc.get_server_node().get_browse_name().Name)
439  self.assertEqual(ev.SourceNode, self.opc.get_server_node().nodeid)
440  self.assertEqual(ev.Message.Text, msg)
441  self.assertEqual(ev.Time, tid)
442  self.assertEqual(ev.PropertyNum, propertynum)
443  self.assertEqual(ev.PropertyString, propertystring)
444 
445  # time.sleep(0.1)
446  sub.unsubscribe(handle)
447  sub.delete()
448 
450  objects = self.srv.get_objects_node()
451  o = objects.add_object(3, 'MyObject')
452  etype = self.srv.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
453  evgen = self.srv.get_event_generator(etype, o)
454 
455  myhandler = MySubHandler()
456  sub = self.opc.create_subscription(100, myhandler)
457  handle = sub.subscribe_events(o, etype)
458 
459  propertynum = 2
460  propertystring = "This is my test"
461  evgen.event.PropertyNum = propertynum
462  evgen.event.PropertyString = propertystring
463  tid = datetime.utcnow()
464  msg = b"this is my msg "
465  evgen.trigger(tid, msg)
466 
467  ev = myhandler.future.result(10)
468  self.assertIsNot(ev, None) # we did not receive event
469  self.assertEqual(ev.EventType, etype.nodeid)
470  self.assertEqual(ev.Severity, 1)
471  self.assertEqual(ev.SourceName, 'MyObject')
472  self.assertEqual(ev.SourceNode, o.nodeid)
473  self.assertEqual(ev.Message.Text, msg)
474  self.assertEqual(ev.Time, tid)
475  self.assertEqual(ev.PropertyNum, propertynum)
476  self.assertEqual(ev.PropertyString, propertystring)
477 
478  # time.sleep(0.1)
479  sub.unsubscribe(handle)
480  sub.delete()
481 
483  objects = self.srv.get_objects_node()
484  o = objects.add_object(3, 'MyObject')
485 
486  etype1 = self.srv.create_custom_event_type(2, 'MyEvent1', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
487  evgen1 = self.srv.get_event_generator(etype1, o)
488 
489  etype2 = self.srv.create_custom_event_type(2, 'MyEvent2', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
490  evgen2 = self.srv.get_event_generator(etype2, o)
491 
492  myhandler = MySubHandler2()
493  sub = self.opc.create_subscription(100, myhandler)
494  handle = sub.subscribe_events(o, etype1)
495 
496  propertynum1 = 1
497  propertystring1 = "This is my test 1"
498  evgen1.event.PropertyNum = propertynum1
499  evgen1.event.PropertyString = propertystring1
500 
501  propertynum2 = 2
502  propertystring2 = "This is my test 2"
503  evgen2.event.PropertyNum = propertynum2
504  evgen2.event.PropertyString = propertystring2
505 
506  for i in range(3):
507  evgen1.trigger()
508  evgen2.trigger()
509  time.sleep(1)
510 
511  self.assertEqual(len(myhandler.results), 3)
512  ev = myhandler.results[-1]
513  self.assertEqual(ev.EventType, etype1.nodeid)
514 
515  handle = sub.subscribe_events(o, etype2)
516  for i in range(4):
517  evgen1.trigger()
518  evgen2.trigger()
519  time.sleep(1)
520 
521 
522  ev1s = [ev for ev in myhandler.results if ev.EventType == etype1.nodeid]
523  ev2s = [ev for ev in myhandler.results if ev.EventType == etype2.nodeid]
524 
525  self.assertEqual(len(myhandler.results), 11)
526  self.assertEqual(len(ev2s), 4)
527  self.assertEqual(len(ev1s), 7)
528 
529  sub.unsubscribe(handle)
530  sub.delete()
531 
533  objects = self.srv.get_objects_node()
534  o = objects.add_object(3, 'MyObject')
535 
536  etype1 = self.srv.create_custom_event_type(2, 'MyEvent1', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
537  evgen1 = self.srv.get_event_generator(etype1, o)
538 
539  etype2 = self.srv.create_custom_event_type(2, 'MyEvent2', ua.ObjectIds.BaseEventType, [('PropertyNum2', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
540  evgen2 = self.srv.get_event_generator(etype2, o)
541 
542  etype3 = self.srv.create_custom_event_type(2, 'MyEvent3', ua.ObjectIds.BaseEventType, [('PropertyNum3', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
543  evgen3 = self.srv.get_event_generator(etype3, o)
544 
545  myhandler = MySubHandler2()
546  sub = self.opc.create_subscription(100, myhandler)
547  handle = sub.subscribe_events(o, [etype1, etype3])
548 
549  propertynum1 = 1
550  propertystring1 = "This is my test 1"
551  evgen1.event.PropertyNum = propertynum1
552  evgen1.event.PropertyString = propertystring1
553 
554  propertynum2 = 2
555  propertystring2 = "This is my test 2"
556  evgen2.event.PropertyNum2 = propertynum2
557  evgen2.event.PropertyString = propertystring2
558 
559  propertynum3 = 3
560  propertystring3 = "This is my test 3"
561  evgen3.event.PropertyNum3 = propertynum3
562  evgen3.event.PropertyString = propertystring2
563 
564  for i in range(3):
565  evgen1.trigger()
566  evgen2.trigger()
567  evgen3.trigger()
568  evgen3.event.PropertyNum3 = 9999
569  evgen3.trigger()
570  time.sleep(1)
571 
572  ev1s = [ev for ev in myhandler.results if ev.EventType == etype1.nodeid]
573  ev2s = [ev for ev in myhandler.results if ev.EventType == etype2.nodeid]
574  ev3s = [ev for ev in myhandler.results if ev.EventType == etype3.nodeid]
575 
576  self.assertEqual(len(myhandler.results), 7)
577  self.assertEqual(len(ev1s), 3)
578  self.assertEqual(len(ev2s), 0)
579  self.assertEqual(len(ev3s), 4)
580  self.assertEqual(ev1s[0].PropertyNum, propertynum1)
581  self.assertEqual(ev3s[0].PropertyNum3, propertynum3)
582  self.assertEqual(ev3s[-1].PropertyNum3, 9999)
583  self.assertEqual(ev1s[0].PropertyNum3, None)
584 
585  sub.unsubscribe(handle)
586  sub.delete()
587 
def datachange_notification(self, node, val, data)
def datachange_notification(self, node, val, data)
def get_event_properties_from_type_node(node)
Definition: events.py:165
def datachange_notification(self, node, val, data)
def datachange_notification(self, node, val, data)


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:44