test_highlevel.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 
3 import sys
4 sys.path.insert(0, "../../build/bin/")
5 import datetime
6 import unittest
7 from threading import Thread, Event
8 try:
9  from queue import Queue
10 except ImportError:
11  from Queue import Queue
12 import time
13 from threading import Condition
14 import opcua
15 
16 port_num1 = 48410
17 port_num2 = 48430
18 
19 class SubHandler(opcua.SubscriptionHandler):
20  '''
21  Dummy subscription client
22  '''
23  def data_change(self, handle, node, val, attr):
24  pass
25 
26  def event(self, handle, event):
27  pass
28 
29 class MySubHandler(opcua.SubscriptionHandler):
30  '''
31  More advanced subscription client using conditions, so we can wait for events in tests
32  '''
33  def setup(self):
34  self.cond = Condition()
35  self.node = None
36  self.handle = None
37  self.attribute = None
38  self.value = None
39  self.ev = None
40  return self.cond
41 
42  def data_change(self, handle, node, val, attr):
43  self.handle = handle
44  self.node = node
45  self.value = val
46  self.attribute = attr
47  with self.cond:
48  self.cond.notify_all()
49 
50  def event(self, handle, event):
51  self.ev = event
52  with self.cond:
53  self.cond.notify_all()
54 
55 class Unit(unittest.TestCase):
56  '''
57  Simple unit test that do not need to setup a server or a client
58  '''
59 
60  def test_equal_nodeid(self):
61  nid1 = opcua.NodeId(999, 2)
62  nid2 = opcua.NodeId(999, 2)
63  self.assertTrue(nid1==nid2)
64  self.assertTrue(id(nid1)!=id(nid2))
65 
66  def test_zero_nodeid(self):
67  self.assertEqual(opcua.NodeId(), opcua.NodeId(0,0))
68  self.assertEqual(opcua.NodeId(), opcua.NodeId('ns=0;i=0;'))
69 
70  def test_string_nodeid(self):
71  nid = opcua.NodeId('titi', 1)
72  self.assertEqual(nid.namespace_index, 1)
73  self.assertEqual(nid.identifier, 'titi')
74  self.assertTrue(nid.is_string)
75 
77  nid = opcua.NodeId(999, 2)
78  self.assertEqual(nid.namespace_index, 2)
79  self.assertEqual(nid.identifier, 999)
80  self.assertTrue(nid.is_integer)
81 
83  nid = opcua.NodeId('ns=2;s=PLC1.Manufacturer;')
84  self.assertEqual(nid.namespace_index, 2)
85  self.assertEqual(nid.identifier, 'PLC1.Manufacturer')
86 
87 
89  nid = opcua.NodeId('ns=2;s=PLC1.Manufacturer;')
90  self.assertEqual(str(nid), 'ns=2;s=PLC1.Manufacturer;')
91  self.assertEqual(repr(nid), 'ns=2;s=PLC1.Manufacturer;')
92 
94  qn = opcua.QualifiedName('qname', 2)
95  self.assertEqual(qn.namespace_index, 2)
96  self.assertEqual(qn.name, 'qname')
97  self.assertEqual(repr(qn), 'QualifiedName(2:qname)')
98  self.assertEqual(repr(qn), str(qn))
99  qn2 = opcua.QualifiedName(2,'qname')
100  self.assertEqual(qn,qn2)
101 
102  def test_datavalue(self):
103  dv = opcua.DataValue(123)
104  self.assertEqual(dv.value, 123)
105  dv = opcua.DataValue('abc')
106  self.assertEqual(dv.value, 'abc')
107  tnow = int(time.time())
108  dv.source_timestamp = opcua.DateTime.from_time_t(tnow)
109  self.assertEqual(dv.source_timestamp.to_time_t(), tnow)
110  dv = opcua.DataValue(True, opcua.VariantType.BOOLEAN)
111  self.assertEqual(dv.value, True)
112  self.assertEqual(type(dv.value), bool)
113 
115  ad=opcua.ApplicationDescription()
116  self.assertEqual(ad.type,opcua.ApplicationType.Server)
117  ad.discovery_urls=['a','b','c']
118  self.assertEqual(ad.discovery_urls,['a','b','c'])
119 
121  utp = opcua.UserTokenPolicy()
122  self.assertEqual(utp.token_type,opcua.UserTokenType.Anonymous)
123 
125  ed=opcua.EndpointDescription()
126  self.assertEqual(ed.security_mode, opcua.MessageSecurityMode.Invalid)
127  self.assertEqual(ed.security_level,0)
128  ed.server_description=opcua.ApplicationDescription()
129  self.assertEqual(ed.user_identify_tokens,[])
130  ed.user_identify_tokens = [opcua.UserTokenPolicy()]*3
131  self.assertEqual(len(ed.user_identify_tokens),3)
132 
134  rd=opcua.ReferenceDescription()
135  self.assertEqual(rd.browse_name,opcua.QualifiedName())
136  self.assertEqual(rd.is_forward,False)
137  self.assertEqual(rd.reference_type_id,opcua.NodeId())
138  self.assertEqual(rd.target_node_class,opcua.NodeClass.Unspecified)
139  self.assertEqual(rd.target_node_id,opcua.NodeId())
140  self.assertEqual(rd.target_node_type_definition,opcua.NodeId())
141 
143  avid=opcua.ReadValueId()
144  self.assertEqual(avid.node_id, opcua.NodeId())
145  self.assertEqual(avid.attribute_id, opcua.AttributeId())
146  self.assertEqual(avid.index_range, '')
147  self.assertEqual(avid.data_encoding, opcua.QualifiedName())
148 
149  def test_write_value(self):
150  wv=opcua.WriteValue()
151  self.assertEqual(wv.node_id, opcua.NodeId())
152  self.assertEqual(wv.attribute_id, opcua.AttributeId())
153  self.assertEqual(wv.index_range,'')
154  self.assertEqual(wv.value.value, None)
155 
156  def test_datetime(self):
157  tnow1 = int(time.time())
158  tnow = int((datetime.datetime.utcnow() - datetime.datetime(1970,1,1)).total_seconds())
159  self.assertEqual(tnow, tnow1) #if this one fails this is a system error, not freopcua
160 
161  dt = opcua.DateTime.from_time_t(tnow)
162  self.assertEqual(tnow, dt.to_time_t())
163 
164  pydt = dt.to_datetime()
165  self.assertEqual(tnow, int((pydt - datetime.datetime(1970,1,1)).total_seconds()))
166 
167  dt2 = opcua.DateTime(pydt)
168  #self.assertEqual(dt2, dt) #FIXME: not implemented
169  pydt2 = dt.to_datetime()
170  self.assertEqual(pydt, pydt2)
171 
173  event = opcua.Event()
174  txt = "This is string"
175  event.message = txt #message is of type LocalizedText
176  self.assertEqual(txt, event.message)
177 
178 
179 
180 class CommonTests(object):
181  '''
182  Tests that will be run twice. Once on server side and once on
183  client side since we have been carefull to have the exact
184  same api on server and client side
185  '''
186 
187  def test_root(self):
188  root = self.opc.get_root_node()
189  self.assertEqual(opcua.QualifiedName('Root', 0), root.get_browse_name())
190  nid = opcua.NodeId(84, 0)
191  self.assertEqual(nid, root.get_id())
192 
193  def test_objects(self):
194  objects = self.opc.get_objects_node()
195  self.assertEqual(opcua.QualifiedName('Objects', 0), objects.get_browse_name())
196  nid = opcua.NodeId(85, 0)
197  self.assertEqual(nid, objects.get_id())
198 
199  def test_add_nodes(self):
200  objects = self.opc.get_objects_node()
201  f = objects.add_folder(3, 'MyFolder')
202  v = f.add_variable(3, 'MyVariable', 6)
203  p = f.add_property(3, 'MyProperty', 10)
204  childs = f.get_children()
205  self.assertTrue(v in childs)
206  self.assertTrue(p in childs)
207 
209  objects = self.opc.get_objects_node()
210  v = objects.add_variable('ns=3;i=888;', '3:numericnodefromstring', 99)
211  nid = opcua.NodeId(888, 3)
212  qn = opcua.QualifiedName('numericnodefromstring', 3)
213  self.assertEqual(nid, v.get_id())
214  self.assertEqual(qn, v.get_browse_name())
215 
217  objects = self.opc.get_objects_node()
218  v = objects.add_variable('ns=3;s=stringid;', '3:stringnodefromstring', [68])
219  nid = opcua.NodeId('stringid', 3)
220  qn = opcua.QualifiedName('stringnodefromstring', 3)
221  self.assertEqual(nid, v.get_id())
222  self.assertEqual(qn, v.get_browse_name())
223 
225  objects = self.opc.get_objects_node()
226  v = objects.add_variable('ns=3;s=stringarrayid;', '9:stringarray', ['l', 'b'])
227  nid = opcua.NodeId('stringarrayid', 3)
228  qn = opcua.QualifiedName('stringarray', 9)
229  self.assertEqual(nid, v.get_id())
230  self.assertEqual(qn, v.get_browse_name())
231  val = v.get_value()
232  self.assertEqual(['l', 'b'], val)
233 
235  objects = self.opc.get_objects_node()
236  nid = opcua.NodeId(9999, 3)
237  qn = opcua.QualifiedName('AddNodeVar1', 3)
238  v1 = objects.add_variable(nid, qn, 0)
239  self.assertEqual(nid, v1.get_id())
240  self.assertEqual(qn, v1.get_browse_name())
241 
243  objects = self.opc.get_objects_node()
244  qn = opcua.QualifiedName('AddNodeVar2', 3)
245  nid = opcua.NodeId('AddNodeVar2Id', 3)
246  v2 = objects.add_variable(nid, qn, 0)
247  self.assertEqual(nid, v2.get_id())
248  self.assertEqual(qn, v2.get_browse_name())
249 
251  objects = self.opc.get_objects_node()
252  o = objects.add_object('ns=2;i=101;', '2:AddFindObject')
253  o2 = objects.get_child('2:AddFindObject')
254  self.assertEqual(o, o2)
255 
256  def test_node_path(self):
257  objects = self.opc.get_objects_node()
258  o = objects.add_object('ns=2;i=105;', '2:NodePathObject')
259  root = self.opc.get_root_node()
260  o2 = root.get_child(['0:Objects', '2:NodePathObject'])
261  self.assertEqual(o, o2)
262 
264  objects = self.opc.get_objects_node()
265  o = objects.add_object('ns=2;i=102;', '2:AddReadObject')
266  nid = opcua.NodeId(102, 2)
267  self.assertEqual(o.get_id(), nid)
268  qn = opcua.QualifiedName('AddReadObject', 2)
269  self.assertEqual(o.get_browse_name(), qn)
270 
271  def test_simple_value(self):
272  o = self.opc.get_objects_node()
273  v = o.add_variable(3, 'VariableTestValue', 4.32)
274  val = v.get_value()
275  self.assertEqual(4.32, val)
276 
278  objects = self.opc.get_objects_node()
279  o = objects.add_object('ns=2;i=103;', '2:AddReadObject')
280  with self.assertRaises(RuntimeError):
281  o2 = objects.add_object('ns=2;i=103;', '2:AddReadObject')
282 
284  o = self.opc.get_objects_node()
285  v = o.add_variable(3, 'VariableNegativeValue', 4)
286  v.set_value(-4.54)
287  val = v.get_value()
288  self.assertEqual(-4.54, val)
289 
291  statenode = self.opc.get_node(opcua.ObjectId.Server_ServerStatus_State)
292  state = statenode.get_value()
293  self.assertEqual(state, 0)
294 
295  def test_array_value(self):
296  o = self.opc.get_objects_node()
297  v = o.add_variable(3, 'VariableArrayValue', [1, 2, 3])
298  val = v.get_value()
299  self.assertEqual([1, 2, 3], val)
300 
302  o = self.opc.get_objects_node()
303  v = o.add_variable(3, 'VariableArrayValue', [1, 2, 3])
304  v.set_value([1])
305  val = v.get_value()
306  self.assertEqual([1], val)
307 
309  o = self.opc.get_objects_node()
310  v = o.add_variable(3, 'SubscriptionVariable', [1, 2, 3])
311  sub = self.opc.create_subscription(100, sclt)
312  handle = sub.subscribe_data_change(v)
313  time.sleep(0.1)
314  sub.unsubscribe(handle)
315  sub.delete()
316 
318  sub = self.opc.create_subscription(100, sclt)
319  handle = sub.subscribe_events()
320  #time.sleep(0.1)
321  sub.unsubscribe(handle)
322  sub.delete()
323 
324  def test_events(self):
325  msclt = MySubHandler()
326  cond = msclt.setup()
327  sub = self.opc.create_subscription(100, msclt)
328  handle = sub.subscribe_events()
329 
330  ev = opcua.Event()
331  msg = "this is my msg "
332  ev.message = msg
333  tid = datetime.datetime.now()
334  ev.time = tid
335  ev.source_node = self.opc.get_server_node().get_id()
336  ev.source_name = "our server node"
337  ev.severity = 500
338  self.srv.trigger_event(ev)
339 
340  with cond:
341  ret = cond.wait(50000)
342  if sys.version_info.major>2: self.assertEqual(ret, True) # we went into timeout waiting for subcsription callback
343  else: pass # XXX
344  self.assertIsNot(msclt.ev, None)# we did not receive event
345  self.assertEqual(msclt.ev.message, msg)
346  self.assertEqual(msclt.ev.time.to_datetime(), tid)
347  self.assertEqual(msclt.ev.severity, 500)
348  self.assertEqual(msclt.ev.source_node, self.opc.get_server_node().get_id())
349 
350  #time.sleep(0.1)
351  sub.unsubscribe(handle)
352  sub.delete()
353 
354 
356  idx = self.opc.get_namespace_index('http://freeopcua.github.io')
357  self.assertEqual(idx, 1)
358 
360  root = self.opc.get_root_node()
361  idx = self.opc.get_namespace_index('http://freeopcua.github.io')
362  o = root.add_object(idx, 'test_namespace')
363  self.assertEqual(idx, o.get_id().namespace_index)
364  o2 = root.get_child('{}:test_namespace'.format(idx))
365  self.assertEqual(o, o2)
366 
368  root = self.opc.get_root_node()
369  with self.assertRaises(RuntimeError):
370  server_time_node = root.get_child(['0:Objects', '0:Server', '0:nonexistingnode'])
371 
373  '''
374  test subscriptions. This is far too complicated for a unittest but, setting up subscriptions requires a lot of code, so when we first set it up, it is best to test as many things as possible
375  '''
376  msclt = MySubHandler()
377  cond = msclt.setup()
378 
379  o = self.opc.get_objects_node()
380 
381  # subscribe to a variable
382  startv1 = [1, 2, 3]
383  v1 = o.add_variable(3, 'SubscriptionVariableV1', startv1)
384  sub = self.opc.create_subscription(100, msclt)
385  handle1 = sub.subscribe_data_change(v1)
386 
387  # Now check we get the start value
388  with cond:
389  ret = cond.wait(0.5)
390  if sys.version_info.major>2: self.assertEqual(ret, True) # we went into timeout waiting for subcsription callback
391  else: pass # XXX
392  self.assertEqual(msclt.value, startv1)
393  self.assertEqual(msclt.node, v1)
394 
395  # modify v1 and check we get value
396  v1.set_value([5])
397  with cond:
398  ret = cond.wait(0.5)
399  if sys.version_info.major>2: self.assertEqual(ret, True) # we went into timeout waiting for subcsription callback
400  else: pass # XXX
401  self.assertEqual(msclt.node, v1)
402  self.assertEqual(msclt.value, [5])
403 
404  sub.unsubscribe(handle1)
405  sub.delete()
406 
408  root = self.opc.get_root_node()
409  server_time_node = root.get_child(['0:Objects', '0:Server', '0:ServerStatus', '0:CurrentTime'])
410  correct = self.opc.get_node(opcua.ObjectId.Server_ServerStatus_CurrentTime)
411  self.assertEqual(server_time_node, correct)
412 
414  msclt = MySubHandler()
415  cond = msclt.setup()
416 
417  server_time_node = self.opc.get_node(opcua.ObjectId.Server_ServerStatus_CurrentTime)
418 
419  sub = self.opc.create_subscription(200, msclt)
420  handle = sub.subscribe_data_change(server_time_node)
421 
422  with cond:
423  ret = cond.wait(0.5)
424  if sys.version_info.major>2: self.assertEqual(ret, True) # we went into timeout waiting for subcsription callback
425  else: pass # XXX
426  self.assertEqual(msclt.node, server_time_node)
427  # FIXME: Add test to verify server clock. How do I convert opcua.DataTime to python datetime?
428 
429  sub.unsubscribe(handle)
430  sub.delete()
431 
433  time_node = self.opc.get_node(opcua.ObjectId.Server_ServerStatus_CurrentTime)
434  dt = time_node.get_value()
435  pydt = dt.to_datetime()
436  utcnow = datetime.datetime.utcnow()
437  delta = utcnow - pydt
438  self.assertTrue(delta < datetime.timedelta(seconds=1))
439 
441  time_node = self.opc.get_node(opcua.ObjectId.Server_ServerStatus_CurrentTime)
442  now = datetime.datetime.now()
443  objects = self.opc.get_objects_node()
444  v1 = objects.add_variable(4, "test_datetime", now)
445  tid = v1.get_value()
446  self.assertEqual(now, tid.to_datetime())
447 
448 
449 
450 
451 class ServerProcess(Thread):
452  '''
453  Start a server in another process
454  '''
455  def __init__(self):
456  Thread.__init__(self)
457  self._exit = Event()
458  self.started = Event()
459  self._queue = Queue()
460 
461  def run(self):
462  self.srv = opcua.Server()
463  self.srv.set_endpoint('opc.tcp://localhost:%d' % port_num1)
464  self.srv.start()
465  self.started.set()
466  while not self._exit.is_set():
467  time.sleep(0.1)
468  if not self._queue.empty():
469  ev = self._queue.get()
470  self.srv.trigger_event(ev)
471  self.srv.stop()
472 
473  def stop(self):
474  self._exit.set()
475 
476  def trigger_event(self, ev):
477  self._queue.put(ev)
478 
479 
480 class TestClient(unittest.TestCase, CommonTests):
481  '''
482  Run common tests on client side
483  Of course we need a server so we start a server in another
484  process using python Process module
485  Tests that can only be run on client side must be defined here
486  '''
487  @classmethod
488  def setUpClass(self):
489  # start server in its own process
490  global globalserver
491  self.srv = globalserver
492  self.srv.start()
493  self.srv.started.wait() # let it initialize
494 
495  # start client
496  self.clt = opcua.Client();
497  self.clt.connect('opc.tcp://localhost:%d' % port_num1)
498  self.opc = self.clt
499 
500  @classmethod
501  def tearDownClass(self):
502  self.clt.disconnect()
503  # stop the server in its own process
504  self.srv.stop()
505  # wait for server to stop, otherwise we may try to start a
506  # new one before this one is really stopped
507  self.srv.join()
508 
509 
510 class TestServer(unittest.TestCase, CommonTests):
511  '''
512  Run common tests on server side
513  Tests that can only be run on server side must be defined here
514  '''
515  @classmethod
516  def setUpClass(self):
517  self.srv = opcua.Server()
518  self.srv.set_endpoint('opc.tcp://localhost:%d' % port_num2)
519  self.srv.start()
520  self.opc = self.srv
521 
522  @classmethod
523  def tearDownClass(self):
524  self.srv.stop()
525 
527  uri = 'http://mycustom.namespace.com'
528  idx1 = self.opc.register_namespace(uri)
529  idx2 = self.opc.get_namespace_index(uri)
530  self.assertEqual(idx1, idx2)
531 
533  uri = 'http://my_very_custom.namespace.com'
534  idx = self.opc.register_namespace(uri)
535  root = self.opc.get_root_node()
536  myvar = root.add_variable(idx, 'var_in_custom_namespace', [5])
537  myid = myvar.get_id()
538  self.assertEqual(idx, myid.namespace_index)
539  #self.assertEqual(uri, myid.namespace_uri) #FIXME: should return uri!!!
540 
541 
542 
543 
544 if __name__ == '__main__':
545  globalserver = ServerProcess() #server process will be started by client tests
546  try:
547  sclt = SubHandler()
548  unittest.main(verbosity=3)
549  finally:
550  globalserver.stop()
551 
def test_user_token_policy(self)
def event(self, handle, event)
def test_qualifiedstring_nodeid(self)
def test_numeric_nodeid(self)
std::string format(CStringRef format_str, ArgList args)
Definition: format.h:3686
def test_equal_nodeid(self)
def test_localized_text(self)
def event(self, handle, event)
def test_write_value(self)
def data_change(self, handle, node, val, attr)
def test_add_string_array_variable(self)
def test_qualified_name(self)
def test_endpoint_description(self)
def test_strrepr_nodeid(self)
def test_create_delete_subscription(self)
def test_zero_nodeid(self)
def data_change(self, handle, node, val, attr)
def test_application_description(self)
def test_attribute_valueid(self)
def test_string_nodeid(self)
def test_reference_description(self)


ros_opcua_impl_freeopcua
Author(s): Denis Štogl
autogenerated on Tue Jan 19 2021 03:12:08