4 NAME =
'test_rtmlaunch' 10 import roslib; roslib.load_manifest(PKG)
12 import unittest, os, sys, time
14 from subprocess
import call, check_output, Popen, PIPE, STDOUT
26 tree = rtctree.tree.RTCTree(servers=
'localhost:2809')
27 provider = tree.get_node([
'/',
'localhost:2809',
'MyServiceProvider0.rtc'])
28 print >>sys.stderr,
"Provier : ", provider, provider.get_state_string()
29 if provider.state==rtctree.component.Component.ACTIVE:
35 self.assertTrue(provider.state==rtctree.component.Component.ACTIVE,
"State of Provider is %s"%(provider.state_string))
41 tree = rtctree.tree.RTCTree(servers=
'localhost:2809')
42 consumer = tree.get_node([
'/',
'localhost:2809',
'MyServiceConsumer0.rtc'])
43 print >>sys.stderr,
"Consumer : ", consumer, consumer.get_state_string()
44 if consumer.state==rtctree.component.Component.ACTIVE:
50 self.assertTrue(consumer.state==rtctree.component.Component.ACTIVE,
"State of Consumer is %s"%(consumer.state_string))
56 tree = rtctree.tree.RTCTree(servers=
'localhost:2809')
57 provider = tree.get_node([
'/',
'localhost:2809',
'MyServiceProvider0.rtc'])
58 consumer = tree.get_node([
'/',
'localhost:2809',
'MyServiceConsumer0.rtc'])
59 provider_port = provider.get_port_by_name(
"MyService")
60 consumer_port = consumer.get_port_by_name(
"MyService")
61 connection = provider_port.get_connection_by_dest(consumer_port)
62 print >>sys.stderr,
"Connection : ", connection.properties
68 self.assertTrue(connection.properties[
'port.port_type']==
'CorbaPort')
69 self.assertTrue(connection.properties[
'dataport.subscription_type']==
'flush')
76 tree = rtctree.tree.RTCTree(servers=
'localhost:2809')
77 seqin = tree.get_node([
'/',
'localhost:2809',
'SequenceInComponent0.rtc'])
78 print >>sys.stderr,
"SeqIn : ", seqin, seqin.get_state_string()
79 if seqin.state==rtctree.component.Component.ACTIVE:
85 self.assertTrue(seqin.state==rtctree.component.Component.ACTIVE,
"State of SeqIn is %s"%(seqin.state_string))
91 tree = rtctree.tree.RTCTree(servers=
'localhost:2809')
92 seqout = tree.get_node([
'/',
'localhost:2809',
'SequenceOutComponent0.rtc'])
93 print >>sys.stderr,
"SeqOut : ", seqout, seqout.get_state_string()
94 if seqout.state==rtctree.component.Component.ACTIVE:
100 self.assertTrue(seqout.state==rtctree.component.Component.ACTIVE,
"State of SeqOut is %s"%(seqout.state_string))
106 tree = rtctree.tree.RTCTree(servers=
'localhost:2809')
107 seqin = tree.get_node([
'/',
'localhost:2809',
'SequenceInComponent0.rtc'])
108 seqout = tree.get_node([
'/',
'localhost:2809',
'SequenceInComponent0.rtc'])
109 seqin_port1 = seqin.get_port_by_name(
"Float")
110 seqin_port2 = seqin.get_port_by_name(
"FloatSeq")
111 seqout_port1 = seqout.get_port_by_name(
"Float")
112 seqout_port2 = seqout.get_port_by_name(
"FloatSeq")
113 connection1 = seqin_port1.get_connection_by_dest(seqout_port1)
114 connection2 = seqin_port2.get_connection_by_dest(seqout_port2)
115 print >>sys.stderr,
"Connection : ", connection1.properties
116 print >>sys.stderr,
"Connection : ", connection2.properties
123 self.assertTrue(connection1.properties[
'dataport.data_type']==
'IDL:RTC/TimedFloat:1.0')
124 self.assertTrue(connection1.properties[
'dataport.subscription_type']==
'new')
125 self.assertTrue(connection1.properties[
'dataport.publisher.push_policy']==
'all')
127 self.assertTrue(connection2.properties[
'dataport.data_type']==
'IDL:RTC/TimedFloatSeq:1.0')
128 self.assertTrue(connection2.properties[
'dataport.subscription_type']==
'flush')
129 self.assertTrue(connection2.properties.get(
'dataport.publisher.push_policy')==
None)
133 if __name__ ==
'__main__':
135 rostest.run(PKG, NAME, TestRtmLaunch, sys.argv)
def test_consumer_activated(self)
def test_seqout_activated(self)
def test_seqin_activated(self)
def test_provider_and_consumer_connected(self)
def test_seqin_and_seqout_connected(self)
def test_provider_activated(self)