test-compile-idl.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 PKG = 'rtmbuild_test'
4 
5 import imp ## for rosbuild
6 try:
7  imp.find_module(PKG)
8 except:
9  import roslib; roslib.load_manifest(PKG)
10 
11 import unittest, os, sys, time, rospy
12 
13 from omniORB import CORBA, any, cdrUnmarshal, cdrMarshal
14 import CosNaming
15 
16 import OpenRTM_aist
17 import RTC, OpenRTM, SDOPackage, RTM
18 from RTC import *
19 
20 rootnc = None
21 orb = None
22 nshost = 'localhost'
23 nsport = 9999
24 
25 def initCORBA() :
26  global rootnc, orb
27  print "configuration ORB with ", nshost, ":", nsport
28  os.environ['ORBInitRef'] = 'NameService=corbaloc:iiop:{0}:{1}/NameService'.format(nshost,nsport)
29 
30  try:
31  orb = CORBA.ORB_init(sys.argv, CORBA.ORB_ID)
32  nameserver = orb.resolve_initial_references("NameService");
33  rootnc = nameserver._narrow(CosNaming.NamingContext)
34  except omniORB.CORBA.ORB.InvalidName, e:
35  sys.exit('[ERROR] Invalide Name (hostname={0}).\n'.format(nshost) +
36  'Make sure the hostname is correct.\n' + str(e))
37  except omniORB.CORBA.TRANSIENT, e:
38  sys.exit('[ERROR] Connection Failed with the Nameserver (hostname={0} port={1}).\n'.format(nshost, nsport) +
39  'Make sure the hostname is correct and the Nameserver is running.\n' + str(e))
40  except Exception as e:
41  print str(e)
42 
43  return None
44 
45 def findObject(name) :
46  global rootnc
47  nc = None
48  while nc == None :
49  try:
50  nc = rootnc.resolve([CosNaming.NameComponent(name, "rtc")])
51  except:
52  print >>sys.stderr, "Waiting for %s ..."%name
53  time.sleep(1)
54  continue
55  time.sleep(1)
56  print >>sys.stderr, "Found for %s (%r)"%(name,nc)
57  return nc
58 
59 class TestCompileIDL(unittest.TestCase):
60  @classmethod
61  def setUpClass(self):
62  global orb
63 
64  # run opnrtm
65  print >>sys.stderr, "initCORBA"
66  initCORBA()
67 
68  # wait for MyServiceROSBridge
69  print >>sys.stderr, "wait for MyServiceROSBridge"
70  bridge = findObject("MyServiceROSBridge")._narrow(RTC.RTObject)
71  print >>sys.stderr, "wait for MyServiceProvider"
72  provider = findObject("MyServiceProvider0")._narrow(RTC.RTObject)
73  # connect
74  print >>sys.stderr, "connect components"
75  inP = provider.get_ports()[0]
76  outP = bridge.get_ports()[0]
77  con_prof = RTC.ConnectorProfile("connector0", "", [outP, inP], [])
78  inP.connect(con_prof)
79  # activate
80  print >>sys.stderr, "activate components"
81  bridge.get_owned_contexts()[0].activate_component(bridge)
82  provider.get_owned_contexts()[0].activate_component(provider)
83 
84  orb.shutdown(1)
85 
86  # run ros
87  rospy.init_node('test_compile_idl')
88 
89 
90  def testEcho(self):
91  sys.path.append("/tmp/test_compile_idl/devel/lib/python2.7/dist-packages")
92  print sys.path
93  import rtmbuild_test.srv
94  rospy.logwarn("wait for service")
95  rospy.wait_for_service("/MyServiceROSBridge/echo")
96  echo = rospy.ServiceProxy("/MyServiceROSBridge/echo", rtmbuild_test.srv.SimpleService_MyService_echo)
97  req = rtmbuild_test.srv.SimpleService_MyService_echo
98  msg = "this is test data 123"
99  rospy.logwarn("send request > " + msg)
100  res = echo(msg)
101  rospy.logwarn("check return < " + res.operation_return)
102  self.assertTrue(res.operation_return == msg, "SimpleService.echo returns incorrect string")
103 
104 # unittest.main()
105 if __name__ == '__main__':
106  import rostest
107  rostest.rosrun(PKG, 'test_myservice_rosbridge', TestCompileIDL)


rtmbuild
Author(s): Kei Okada , Manabu Saito
autogenerated on Mon May 10 2021 02:31:01