test_parameter_client.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2011, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 #
00034 # Revision $Id: test_empty_service.py 3803 2009-02-11 02:04:39Z rob_wheeler $
00035 
00036 PKG = 'rosjava'
00037 NAME = 'test_parameter_client'
00038 import roslib; roslib.load_manifest(PKG)
00039 
00040 import roslib.names
00041 
00042 from ros import rospy
00043 from ros import std_msgs
00044 from ros import rostest
00045 
00046 import sys
00047 import time
00048 import unittest
00049 
00050 from std_msgs.msg import String, Bool, Int64, Float64
00051 from rosjava_test_msgs.msg import Composite, CompositeA, CompositeB, TestArrays
00052 
00053 class TestParameterClient(unittest.TestCase):
00054         
00055     def setUp(self):
00056         rospy.init_node(NAME)
00057 
00058         rospy.Subscriber('tilde', String, self.tilde_cb)
00059         rospy.Subscriber('string', String, self.string_cb)
00060         rospy.Subscriber('bool', Bool, self.bool_cb)
00061         rospy.Subscriber('list', TestArrays, self.list_cb)
00062         rospy.Subscriber('int', Int64, self.int_cb)
00063         rospy.Subscriber('float', Float64, self.float_cb)
00064         rospy.Subscriber('composite', Composite, self.composite_cb)
00065         
00066         self.list_msg = self.tilde_msg = self.string_msg = \
00067                         self.bool_msg = self.int_msg = self.float_msg = \
00068                         self.composite_msg = None
00069         self.tests = ['string', 'int', 'bool', 'float', 'composite', 'list']
00070         
00071     def string_cb(self, msg):
00072         self.string_msg = msg
00073     def int_cb(self, msg):
00074         self.int_msg = msg
00075     def bool_cb(self, msg):
00076         self.bool_msg = msg
00077     def float_cb(self, msg):
00078         self.float_msg = msg
00079     def composite_cb(self, msg):
00080         self.composite_msg = msg
00081     def list_cb(self, msg):
00082         self.list_msg = msg
00083     def tilde_cb(self, msg):
00084         self.tilde_msg = msg
00085 
00086     def test_parameter_client_read(self):
00087         timeout_t = time.time() + 20.
00088         print "waiting for 20 seconds for client to load"
00089 
00090         tests = self.tests
00091         msgs = [None]
00092         while any(1 for m in msgs if m is None) and \
00093                 not rospy.is_shutdown() and \
00094                 timeout_t > time.time():
00095             time.sleep(0.2)
00096             msgs = [getattr(self, t+'_msg') for t in tests]
00097 
00098         print "msgs: %s"%(msgs)
00099         
00100         self.failIf(timeout_t < time.time(), "timeout exceeded")
00101         self.failIf(rospy.is_shutdown(), "node shutdown")            
00102         self.failIf(any(1 for m in msgs if m is None), "did not receive all expected messages: "+str(msgs))
00103 
00104         ns = rospy.get_param('parameter_namespace')
00105         for t in tests:
00106             p_name = roslib.names.ns_join(ns, t)
00107             value = rospy.get_param(p_name, t)
00108             msg = getattr(self, "%s_msg"%(t))
00109             print "get param: %s"%(p_name)
00110             print "param value: %s"%(value)
00111             print "msg value: %s"%(msg)
00112             if t == 'composite':
00113                 print "value", p_name, value
00114                 m = Composite(CompositeA(**value['a']), CompositeB(**value['b']))
00115                 self.assertAlmostEquals(m.a.x, msg.a.x)
00116                 self.assertAlmostEquals(m.a.y, msg.a.y)
00117                 self.assertAlmostEquals(m.a.z, msg.a.z)
00118                 self.assertAlmostEquals(m.a.w, msg.a.w)
00119                 self.assertAlmostEquals(m.b.x, msg.b.x)
00120                 self.assertAlmostEquals(m.b.y, msg.b.y)
00121                 self.assertAlmostEquals(m.b.z, msg.b.z)
00122             elif t == 'list':
00123                 self.assertEquals(list(value), list(msg.int32_array))
00124             elif t == 'float':
00125                 self.assertAlmostEquals(value, msg.data)
00126             else:
00127                 self.assertEquals(value, msg.data)
00128 
00129     def test_set_parameter(self):
00130         # make sure client copied each parameter correct
00131         ns_source = rospy.get_param('parameter_namespace')
00132         ns_target = rospy.get_param('target_namespace')
00133         tests = self.tests
00134         for t in tests:
00135             source_name = roslib.names.ns_join(ns_source, t)
00136             target_name = roslib.names.ns_join(ns_target, t)
00137             source_value = rospy.get_param(source_name)
00138             target_value = rospy.get_param(target_name)
00139             if t != 'float':
00140                 self.assertEquals(source_value, target_value)
00141             else:
00142                 self.assertAlmostEquals(source_value, target_value)
00143         
00144     def test_tilde_parameter(self):
00145         timeout_t = time.time() + 20.
00146         print "waiting for 20 seconds for client to load"
00147         while self.tilde_msg is None and \
00148                 not rospy.is_shutdown() and \
00149                 timeout_t > time.time():
00150             time.sleep(0.2)
00151 
00152         self.failIf(timeout_t < time.time(), "timeout exceeded")
00153         self.failIf(rospy.is_shutdown(), "node shutdown")            
00154         self.failIf(self.tilde_msg is None)
00155         
00156         self.assertEquals(rospy.get_param('param_client/tilde'), self.tilde_msg.data)
00157 
00158 if __name__ == '__main__':
00159     import rostest
00160     rostest.run(PKG, NAME, TestParameterClient, sys.argv)


rosjava_core
Author(s):
autogenerated on Wed Aug 26 2015 16:06:49