test-rtmlaunch.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 PKG = 'openrtm_tools'
4 NAME = 'test_rtmlaunch'
5 
6 import imp ## for rosbuild
7 try:
8  imp.find_module(PKG)
9 except:
10  import roslib; roslib.load_manifest(PKG)
11 
12 import unittest, os, sys, time
13 import subprocess
14 from subprocess import call, check_output, Popen, PIPE, STDOUT
15 
16 import rtctree.tree
17 
18 class TestRtmLaunch(unittest.TestCase):
19 
20  # check if MyServiceProviderComp and MyServiceConsumerComp is activated and connected
22  provider = None
23  count = 0
24  while count < 20 :
25  try:
26  tree = rtctree.tree.RTCTree(servers='localhost:2809')
27  provider = tree.get_node(['/', 'localhost:2809','MyServiceProvider0.rtc'])
28  print >>sys.stderr, "Provier : ", provider, provider.get_state_string()
29  if provider.state==rtctree.component.Component.ACTIVE:
30  break
31  except:
32  pass
33  time.sleep(1)
34  count += 1
35  self.assertTrue(provider.state==rtctree.component.Component.ACTIVE, "State of Provider is %s"%(provider.state_string))
36 
38  count = 0
39  while count < 20 :
40  try:
41  tree = rtctree.tree.RTCTree(servers='localhost:2809')
42  consumer = tree.get_node(['/', 'localhost:2809','MyServiceConsumer0.rtc'])
43  print >>sys.stderr, "Consumer : ", consumer, consumer.get_state_string()
44  if consumer.state==rtctree.component.Component.ACTIVE:
45  break
46  except:
47  pass
48  time.sleep(1)
49  count += 1
50  self.assertTrue(consumer.state==rtctree.component.Component.ACTIVE, "State of Consumer is %s"%(consumer.state_string))
51 
53  count = 0
54  while count < 20 :
55  try:
56  tree = rtctree.tree.RTCTree(servers='localhost:2809')
57  provider = tree.get_node(['/', 'localhost:2809','MyServiceProvider0.rtc'])
58  consumer = tree.get_node(['/', 'localhost:2809','MyServiceConsumer0.rtc'])
59  provider_port = provider.get_port_by_name("MyService")
60  consumer_port = consumer.get_port_by_name("MyService")
61  connection = provider_port.get_connection_by_dest(consumer_port)
62  print >>sys.stderr, "Connection : ", connection.properties
63  break
64  except:
65  pass
66  time.sleep(1)
67  count += 1
68  self.assertTrue(connection.properties['port.port_type']=='CorbaPort')
69  self.assertTrue(connection.properties['dataport.subscription_type']=='flush')
70 
71  # check if SequenceInComponent0.rtc and SequenceOutComponent0.rtc ans activated and connected
73  count = 0
74  while count < 20 :
75  try:
76  tree = rtctree.tree.RTCTree(servers='localhost:2809')
77  seqin = tree.get_node(['/', 'localhost:2809','SequenceInComponent0.rtc'])
78  print >>sys.stderr, "SeqIn : ", seqin, seqin.get_state_string()
79  if seqin.state==rtctree.component.Component.ACTIVE:
80  break
81  except:
82  pass
83  time.sleep(1)
84  count += 1
85  self.assertTrue(seqin.state==rtctree.component.Component.ACTIVE, "State of SeqIn is %s"%(seqin.state_string))
86 
88  count = 0
89  while count < 20 :
90  try:
91  tree = rtctree.tree.RTCTree(servers='localhost:2809')
92  seqout = tree.get_node(['/', 'localhost:2809','SequenceOutComponent0.rtc'])
93  print >>sys.stderr, "SeqOut : ", seqout, seqout.get_state_string()
94  if seqout.state==rtctree.component.Component.ACTIVE:
95  break
96  except:
97  pass
98  time.sleep(1)
99  count += 1
100  self.assertTrue(seqout.state==rtctree.component.Component.ACTIVE, "State of SeqOut is %s"%(seqout.state_string))
101 
103  count = 0
104  while count < 20 :
105  try:
106  tree = rtctree.tree.RTCTree(servers='localhost:2809')
107  seqin = tree.get_node(['/', 'localhost:2809','SequenceInComponent0.rtc'])
108  seqout = tree.get_node(['/', 'localhost:2809','SequenceInComponent0.rtc'])
109  seqin_port1 = seqin.get_port_by_name("Float")
110  seqin_port2 = seqin.get_port_by_name("FloatSeq")
111  seqout_port1 = seqout.get_port_by_name("Float")
112  seqout_port2 = seqout.get_port_by_name("FloatSeq")
113  connection1 = seqin_port1.get_connection_by_dest(seqout_port1)
114  connection2 = seqin_port2.get_connection_by_dest(seqout_port2)
115  print >>sys.stderr, "Connection : ", connection1.properties
116  print >>sys.stderr, "Connection : ", connection2.properties
117  break
118  except:
119  pass
120  time.sleep(1)
121  count += 1
122 
123  self.assertTrue(connection1.properties['dataport.data_type']=='IDL:RTC/TimedFloat:1.0')
124  self.assertTrue(connection1.properties['dataport.subscription_type']=='new') # Float
125  self.assertTrue(connection1.properties['dataport.publisher.push_policy']=='all')
126 
127  self.assertTrue(connection2.properties['dataport.data_type']=='IDL:RTC/TimedFloatSeq:1.0')
128  self.assertTrue(connection2.properties['dataport.subscription_type']=='flush') # FloatSeq
129  self.assertTrue(connection2.properties.get('dataport.publisher.push_policy')==None)
130 
131 
132 #unittest.main()
133 if __name__ == '__main__':
134  import rostest
135  rostest.run(PKG, NAME, TestRtmLaunch, sys.argv)
def test_provider_and_consumer_connected(self)


openrtm_tools
Author(s): Kei Okada
autogenerated on Mon May 10 2021 02:30:55