Go to the documentation of this file.00001
00002
00003 PKG = 'rtmbuild_test'
00004
00005 import imp
00006 try:
00007 imp.find_module(PKG)
00008 except:
00009 import roslib; roslib.load_manifest(PKG)
00010
00011 import unittest, os, sys, time, rospy
00012
00013 from omniORB import CORBA, any, cdrUnmarshal, cdrMarshal
00014 import CosNaming
00015
00016 import OpenRTM_aist
00017 import RTC, OpenRTM, SDOPackage, RTM
00018 from RTC import *
00019
00020 rootnc = None
00021 orb = None
00022 nshost = 'localhost'
00023 nsport = 9999
00024
00025 def initCORBA() :
00026 global rootnc, orb
00027 print "configuration ORB with ", nshost, ":", nsport
00028 os.environ['ORBInitRef'] = 'NameService=corbaloc:iiop:{0}:{1}/NameService'.format(nshost,nsport)
00029
00030 try:
00031 orb = CORBA.ORB_init(sys.argv, CORBA.ORB_ID)
00032 nameserver = orb.resolve_initial_references("NameService");
00033 rootnc = nameserver._narrow(CosNaming.NamingContext)
00034 except omniORB.CORBA.ORB.InvalidName, e:
00035 sys.exit('[ERROR] Invalide Name (hostname={0}).\n'.format(nshost) +
00036 'Make sure the hostname is correct.\n' + str(e))
00037 except omniORB.CORBA.TRANSIENT, e:
00038 sys.exit('[ERROR] Connection Failed with the Nameserver (hostname={0} port={1}).\n'.format(nshost, nsport) +
00039 'Make sure the hostname is correct and the Nameserver is running.\n' + str(e))
00040 except Exception as e:
00041 print str(e)
00042
00043 return None
00044
00045 def findObject(name) :
00046 global rootnc
00047 nc = None
00048 while nc == None :
00049 try:
00050 nc = rootnc.resolve([CosNaming.NameComponent(name, "rtc")])
00051 except:
00052 print >>sys.stderr, "Waiting for %s ..."%name
00053 time.sleep(1)
00054 continue
00055 time.sleep(1)
00056 print >>sys.stderr, "Found for %s (%r)"%(name,nc)
00057 return nc
00058
00059 class TestCompileIDL(unittest.TestCase):
00060 @classmethod
00061 def setUpClass(self):
00062 global orb
00063
00064
00065 print >>sys.stderr, "initCORBA"
00066 initCORBA()
00067
00068
00069 print >>sys.stderr, "wait for MyServiceROSBridge"
00070 bridge = findObject("MyServiceROSBridge")._narrow(RTC.RTObject)
00071 print >>sys.stderr, "wait for MyServiceProvider"
00072 provider = findObject("MyServiceProvider0")._narrow(RTC.RTObject)
00073
00074 print >>sys.stderr, "connect components"
00075 inP = provider.get_ports()[0]
00076 outP = bridge.get_ports()[0]
00077 con_prof = RTC.ConnectorProfile("connector0", "", [outP, inP], [])
00078 inP.connect(con_prof)
00079
00080 print >>sys.stderr, "activate components"
00081 bridge.get_owned_contexts()[0].activate_component(bridge)
00082 provider.get_owned_contexts()[0].activate_component(provider)
00083
00084 orb.shutdown(1)
00085
00086
00087 rospy.init_node('test_compile_idl')
00088
00089
00090 def testEcho(self):
00091 sys.path.append("/tmp/test_compile_idl/devel/lib/python2.7/dist-packages")
00092 print sys.path
00093 import rtmbuild_test.srv
00094 rospy.logwarn("wait for service")
00095 rospy.wait_for_service("/MyServiceROSBridge/echo")
00096 echo = rospy.ServiceProxy("/MyServiceROSBridge/echo", rtmbuild_test.srv.SimpleService_MyService_echo)
00097 req = rtmbuild_test.srv.SimpleService_MyService_echo
00098 msg = "this is test data 123"
00099 rospy.logwarn("send request > " + msg)
00100 res = echo(msg)
00101 rospy.logwarn("check return < " + res.operation_return)
00102 self.assertTrue(res.operation_return == msg, "SimpleService.echo returns incorrect string")
00103
00104
00105 if __name__ == '__main__':
00106 import rostest
00107 rostest.rosrun(PKG, 'test_myservice_rosbridge', TestCompileIDL)