00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 PKG = 'rosjava'
00037 NAME = 'test_parameter_client'
00038 import roslib; roslib.load_manifest(PKG)
00039
00040 import roslib.names
00041
00042 from ros import rospy
00043 from ros import std_msgs
00044 from ros import rostest
00045
00046 import sys
00047 import time
00048 import unittest
00049
00050 from std_msgs.msg import String, Bool, Int64, Float64
00051 from rosjava_test_msgs.msg import Composite, CompositeA, CompositeB, TestArrays
00052
00053 class TestParameterClient(unittest.TestCase):
00054
00055 def setUp(self):
00056 rospy.init_node(NAME)
00057
00058 rospy.Subscriber('tilde', String, self.tilde_cb)
00059 rospy.Subscriber('string', String, self.string_cb)
00060 rospy.Subscriber('bool', Bool, self.bool_cb)
00061 rospy.Subscriber('list', TestArrays, self.list_cb)
00062 rospy.Subscriber('int', Int64, self.int_cb)
00063 rospy.Subscriber('float', Float64, self.float_cb)
00064 rospy.Subscriber('composite', Composite, self.composite_cb)
00065
00066 self.list_msg = self.tilde_msg = self.string_msg = \
00067 self.bool_msg = self.int_msg = self.float_msg = \
00068 self.composite_msg = None
00069 self.tests = ['string', 'int', 'bool', 'float', 'composite', 'list']
00070
00071 def string_cb(self, msg):
00072 self.string_msg = msg
00073 def int_cb(self, msg):
00074 self.int_msg = msg
00075 def bool_cb(self, msg):
00076 self.bool_msg = msg
00077 def float_cb(self, msg):
00078 self.float_msg = msg
00079 def composite_cb(self, msg):
00080 self.composite_msg = msg
00081 def list_cb(self, msg):
00082 self.list_msg = msg
00083 def tilde_cb(self, msg):
00084 self.tilde_msg = msg
00085
00086 def test_parameter_client_read(self):
00087 timeout_t = time.time() + 20.
00088 print "waiting for 20 seconds for client to load"
00089
00090 tests = self.tests
00091 msgs = [None]
00092 while any(1 for m in msgs if m is None) and \
00093 not rospy.is_shutdown() and \
00094 timeout_t > time.time():
00095 time.sleep(0.2)
00096 msgs = [getattr(self, t+'_msg') for t in tests]
00097
00098 print "msgs: %s"%(msgs)
00099
00100 self.failIf(timeout_t < time.time(), "timeout exceeded")
00101 self.failIf(rospy.is_shutdown(), "node shutdown")
00102 self.failIf(any(1 for m in msgs if m is None), "did not receive all expected messages: "+str(msgs))
00103
00104 ns = rospy.get_param('parameter_namespace')
00105 for t in tests:
00106 p_name = roslib.names.ns_join(ns, t)
00107 value = rospy.get_param(p_name, t)
00108 msg = getattr(self, "%s_msg"%(t))
00109 print "get param: %s"%(p_name)
00110 print "param value: %s"%(value)
00111 print "msg value: %s"%(msg)
00112 if t == 'composite':
00113 print "value", p_name, value
00114 m = Composite(CompositeA(**value['a']), CompositeB(**value['b']))
00115 self.assertAlmostEquals(m.a.x, msg.a.x)
00116 self.assertAlmostEquals(m.a.y, msg.a.y)
00117 self.assertAlmostEquals(m.a.z, msg.a.z)
00118 self.assertAlmostEquals(m.a.w, msg.a.w)
00119 self.assertAlmostEquals(m.b.x, msg.b.x)
00120 self.assertAlmostEquals(m.b.y, msg.b.y)
00121 self.assertAlmostEquals(m.b.z, msg.b.z)
00122 elif t == 'list':
00123 self.assertEquals(list(value), list(msg.int32_array))
00124 elif t == 'float':
00125 self.assertAlmostEquals(value, msg.data)
00126 else:
00127 self.assertEquals(value, msg.data)
00128
00129 def test_set_parameter(self):
00130
00131 ns_source = rospy.get_param('parameter_namespace')
00132 ns_target = rospy.get_param('target_namespace')
00133 tests = self.tests
00134 for t in tests:
00135 source_name = roslib.names.ns_join(ns_source, t)
00136 target_name = roslib.names.ns_join(ns_target, t)
00137 source_value = rospy.get_param(source_name)
00138 target_value = rospy.get_param(target_name)
00139 if t != 'float':
00140 self.assertEquals(source_value, target_value)
00141 else:
00142 self.assertAlmostEquals(source_value, target_value)
00143
00144 def test_tilde_parameter(self):
00145 timeout_t = time.time() + 20.
00146 print "waiting for 20 seconds for client to load"
00147 while self.tilde_msg is None and \
00148 not rospy.is_shutdown() and \
00149 timeout_t > time.time():
00150 time.sleep(0.2)
00151
00152 self.failIf(timeout_t < time.time(), "timeout exceeded")
00153 self.failIf(rospy.is_shutdown(), "node shutdown")
00154 self.failIf(self.tilde_msg is None)
00155
00156 self.assertEquals(rospy.get_param('param_client/tilde'), self.tilde_msg.data)
00157
00158 if __name__ == '__main__':
00159 import rostest
00160 rostest.run(PKG, NAME, TestParameterClient, sys.argv)