00001 
00002 
00003 PKG = 'openrtm_tools'
00004 NAME = 'test_rtmlaunch'
00005 
00006 import imp  
00007 try:
00008     imp.find_module(PKG)
00009 except:
00010     import roslib; roslib.load_manifest(PKG)
00011 
00012 import unittest, os, sys, time
00013 import subprocess
00014 from subprocess import call, check_output, Popen, PIPE, STDOUT
00015 
00016 import rtctree.tree
00017 
00018 class TestRtmLaunch(unittest.TestCase):
00019 
00020     
00021     def test_provider_activated(self):
00022         provider = None
00023         count = 0
00024         while count < 20 :
00025             try:
00026                 tree = rtctree.tree.RTCTree(servers='localhost:2809')
00027                 provider = tree.get_node(['/', 'localhost:2809','MyServiceProvider0.rtc'])
00028                 print >>sys.stderr, "Provier : ", provider, provider.get_state_string()
00029                 if provider.state==rtctree.component.Component.ACTIVE:
00030                     break
00031             except:
00032                 pass
00033             time.sleep(1)
00034             count += 1
00035         self.assertTrue(provider.state==rtctree.component.Component.ACTIVE, "State of Provider is %s"%(provider.state_string))
00036 
00037     def test_consumer_activated(self):
00038         count = 0
00039         while count < 20 :
00040             try:
00041                 tree = rtctree.tree.RTCTree(servers='localhost:2809')
00042                 consumer = tree.get_node(['/', 'localhost:2809','MyServiceConsumer0.rtc'])
00043                 print >>sys.stderr, "Consumer : ", consumer, consumer.get_state_string()
00044                 if consumer.state==rtctree.component.Component.ACTIVE:
00045                     break
00046             except:
00047                 pass
00048             time.sleep(1)
00049             count += 1
00050         self.assertTrue(consumer.state==rtctree.component.Component.ACTIVE, "State of Consumer is %s"%(consumer.state_string))
00051 
00052     def test_provider_and_consumer_connected(self):
00053         count = 0
00054         while count < 20 :
00055             try:
00056                 tree = rtctree.tree.RTCTree(servers='localhost:2809')
00057                 provider = tree.get_node(['/', 'localhost:2809','MyServiceProvider0.rtc'])
00058                 consumer = tree.get_node(['/', 'localhost:2809','MyServiceConsumer0.rtc'])
00059                 provider_port = provider.get_port_by_name("MyService")
00060                 consumer_port = consumer.get_port_by_name("MyService")
00061                 connection = provider_port.get_connection_by_dest(consumer_port)
00062                 print >>sys.stderr, "Connection : ", connection.properties
00063                 break
00064             except:
00065                 pass
00066             time.sleep(1)
00067             count += 1
00068         self.assertTrue(connection.properties['port.port_type']=='CorbaPort')
00069         self.assertTrue(connection.properties['dataport.subscription_type']=='flush')
00070 
00071     
00072     def test_seqin_activated(self):
00073         count = 0
00074         while count < 20 :
00075             try:
00076                 tree = rtctree.tree.RTCTree(servers='localhost:2809')
00077                 seqin = tree.get_node(['/', 'localhost:2809','SequenceInComponent0.rtc'])
00078                 print >>sys.stderr, "SeqIn  : ", seqin, seqin.get_state_string()
00079                 if seqin.state==rtctree.component.Component.ACTIVE:
00080                     break
00081             except:
00082                 pass
00083             time.sleep(1)
00084             count += 1
00085         self.assertTrue(seqin.state==rtctree.component.Component.ACTIVE, "State of SeqIn is %s"%(seqin.state_string))
00086 
00087     def test_seqout_activated(self):
00088         count = 0
00089         while count < 20 :
00090             try:
00091                 tree = rtctree.tree.RTCTree(servers='localhost:2809')
00092                 seqout = tree.get_node(['/', 'localhost:2809','SequenceOutComponent0.rtc'])
00093                 print >>sys.stderr, "SeqOut : ", seqout, seqout.get_state_string()
00094                 if seqout.state==rtctree.component.Component.ACTIVE:
00095                     break
00096             except:
00097                 pass
00098             time.sleep(1)
00099             count += 1
00100         self.assertTrue(seqout.state==rtctree.component.Component.ACTIVE, "State of SeqOut is %s"%(seqout.state_string))
00101 
00102     def test_seqin_and_seqout_connected(self):
00103         count = 0
00104         while count < 20 :
00105             try:
00106                 tree = rtctree.tree.RTCTree(servers='localhost:2809')
00107                 seqin  = tree.get_node(['/', 'localhost:2809','SequenceInComponent0.rtc'])
00108                 seqout = tree.get_node(['/', 'localhost:2809','SequenceInComponent0.rtc'])
00109                 seqin_port1  = seqin.get_port_by_name("Float")
00110                 seqin_port2  = seqin.get_port_by_name("FloatSeq")
00111                 seqout_port1 = seqout.get_port_by_name("Float")
00112                 seqout_port2 = seqout.get_port_by_name("FloatSeq")
00113                 connection1 = seqin_port1.get_connection_by_dest(seqout_port1)
00114                 connection2 = seqin_port2.get_connection_by_dest(seqout_port2)
00115                 print >>sys.stderr, "Connection : ", connection1.properties
00116                 print >>sys.stderr, "Connection : ", connection2.properties
00117                 break
00118             except:
00119                 pass
00120             time.sleep(1)
00121             count += 1
00122 
00123         self.assertTrue(connection1.properties['dataport.data_type']=='IDL:RTC/TimedFloat:1.0')
00124         self.assertTrue(connection1.properties['dataport.subscription_type']=='new') 
00125         self.assertTrue(connection1.properties['dataport.publisher.push_policy']=='all')
00126 
00127         self.assertTrue(connection2.properties['dataport.data_type']=='IDL:RTC/TimedFloatSeq:1.0')
00128         self.assertTrue(connection2.properties['dataport.subscription_type']=='flush') 
00129         self.assertTrue(connection2.properties.get('dataport.publisher.push_policy')==None)
00130 
00131 
00132 
00133 if __name__ == '__main__':
00134     import rostest
00135     rostest.run(PKG, NAME, TestRtmLaunch, sys.argv)