00001
00002
00003 PKG = 'openrtm_tools'
00004 NAME = 'test_rtmlaunch'
00005
00006 import imp
00007 try:
00008 imp.find_module(PKG)
00009 except:
00010 import roslib; roslib.load_manifest(PKG)
00011
00012 import unittest, os, sys, time
00013 import subprocess
00014 from subprocess import call, check_output, Popen, PIPE, STDOUT
00015
00016 import rtctree.tree
00017
00018 class TestRtmLaunch(unittest.TestCase):
00019
00020
00021 def test_provider_activated(self):
00022 provider = None
00023 count = 0
00024 while count < 20 :
00025 try:
00026 tree = rtctree.tree.RTCTree(servers='localhost:2809')
00027 provider = tree.get_node(['/', 'localhost:2809','MyServiceProvider0.rtc'])
00028 print >>sys.stderr, "Provier : ", provider, provider.get_state_string()
00029 if provider.state==rtctree.component.Component.ACTIVE:
00030 break
00031 except:
00032 pass
00033 time.sleep(1)
00034 count += 1
00035 self.assertTrue(provider.state==rtctree.component.Component.ACTIVE, "State of Provider is %s"%(provider.state_string))
00036
00037 def test_consumer_activated(self):
00038 count = 0
00039 while count < 20 :
00040 try:
00041 tree = rtctree.tree.RTCTree(servers='localhost:2809')
00042 consumer = tree.get_node(['/', 'localhost:2809','MyServiceConsumer0.rtc'])
00043 print >>sys.stderr, "Consumer : ", consumer, consumer.get_state_string()
00044 if consumer.state==rtctree.component.Component.ACTIVE:
00045 break
00046 except:
00047 pass
00048 time.sleep(1)
00049 count += 1
00050 self.assertTrue(consumer.state==rtctree.component.Component.ACTIVE, "State of Consumer is %s"%(consumer.state_string))
00051
00052 def test_provider_and_consumer_connected(self):
00053 count = 0
00054 while count < 20 :
00055 try:
00056 tree = rtctree.tree.RTCTree(servers='localhost:2809')
00057 provider = tree.get_node(['/', 'localhost:2809','MyServiceProvider0.rtc'])
00058 consumer = tree.get_node(['/', 'localhost:2809','MyServiceConsumer0.rtc'])
00059 provider_port = provider.get_port_by_name("MyService")
00060 consumer_port = consumer.get_port_by_name("MyService")
00061 connection = provider_port.get_connection_by_dest(consumer_port)
00062 print >>sys.stderr, "Connection : ", connection.properties
00063 break
00064 except:
00065 pass
00066 time.sleep(1)
00067 count += 1
00068 self.assertTrue(connection.properties['port.port_type']=='CorbaPort')
00069 self.assertTrue(connection.properties['dataport.subscription_type']=='flush')
00070
00071
00072 def test_seqin_activated(self):
00073 count = 0
00074 while count < 20 :
00075 try:
00076 tree = rtctree.tree.RTCTree(servers='localhost:2809')
00077 seqin = tree.get_node(['/', 'localhost:2809','SequenceInComponent0.rtc'])
00078 print >>sys.stderr, "SeqIn : ", seqin, seqin.get_state_string()
00079 if seqin.state==rtctree.component.Component.ACTIVE:
00080 break
00081 except:
00082 pass
00083 time.sleep(1)
00084 count += 1
00085 self.assertTrue(seqin.state==rtctree.component.Component.ACTIVE, "State of SeqIn is %s"%(seqin.state_string))
00086
00087 def test_seqout_activated(self):
00088 count = 0
00089 while count < 20 :
00090 try:
00091 tree = rtctree.tree.RTCTree(servers='localhost:2809')
00092 seqout = tree.get_node(['/', 'localhost:2809','SequenceOutComponent0.rtc'])
00093 print >>sys.stderr, "SeqOut : ", seqout, seqout.get_state_string()
00094 if seqout.state==rtctree.component.Component.ACTIVE:
00095 break
00096 except:
00097 pass
00098 time.sleep(1)
00099 count += 1
00100 self.assertTrue(seqout.state==rtctree.component.Component.ACTIVE, "State of SeqOut is %s"%(seqout.state_string))
00101
00102 def test_seqin_and_seqout_connected(self):
00103 count = 0
00104 while count < 20 :
00105 try:
00106 tree = rtctree.tree.RTCTree(servers='localhost:2809')
00107 seqin = tree.get_node(['/', 'localhost:2809','SequenceInComponent0.rtc'])
00108 seqout = tree.get_node(['/', 'localhost:2809','SequenceInComponent0.rtc'])
00109 seqin_port1 = seqin.get_port_by_name("Float")
00110 seqin_port2 = seqin.get_port_by_name("FloatSeq")
00111 seqout_port1 = seqout.get_port_by_name("Float")
00112 seqout_port2 = seqout.get_port_by_name("FloatSeq")
00113 connection1 = seqin_port1.get_connection_by_dest(seqout_port1)
00114 connection2 = seqin_port2.get_connection_by_dest(seqout_port2)
00115 print >>sys.stderr, "Connection : ", connection1.properties
00116 print >>sys.stderr, "Connection : ", connection2.properties
00117 break
00118 except:
00119 pass
00120 time.sleep(1)
00121 count += 1
00122
00123 self.assertTrue(connection1.properties['dataport.data_type']=='IDL:RTC/TimedFloat:1.0')
00124 self.assertTrue(connection1.properties['dataport.subscription_type']=='new')
00125 self.assertTrue(connection1.properties['dataport.publisher.push_policy']=='all')
00126
00127 self.assertTrue(connection2.properties['dataport.data_type']=='IDL:RTC/TimedFloatSeq:1.0')
00128 self.assertTrue(connection2.properties['dataport.subscription_type']=='flush')
00129 self.assertTrue(connection2.properties.get('dataport.publisher.push_policy')==None)
00130
00131
00132
00133 if __name__ == '__main__':
00134 import rostest
00135 rostest.run(PKG, NAME, TestRtmLaunch, sys.argv)