Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 import sys
00032 import time
00033 import uuid
00034
00035 PKG = 'test_bond'
00036 import roslib; roslib.load_manifest(PKG)
00037 import rospy
00038 from bondpy import bondpy
00039 from test_bond.srv import TestBond
00040
00041 import unittest
00042 import rostest
00043
00044 import atexit
00045 atexit.register(rospy.signal_shutdown, 'exit')
00046
00047
00048 def gen_id():
00049 return "test_" + str(uuid.uuid4())
00050
00051
00052 TOPIC = "test_bond_topic"
00053
00054 test_bond = rospy.ServiceProxy('test_bond', TestBond)
00055
00056
00057 class Exerciser(unittest.TestCase):
00058
00059
00060 def test_normal(self):
00061 id = gen_id()
00062 test_bond.wait_for_service()
00063 test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(2.0))
00064 rospy.logerr("test_normal: test_bond service call just returned")
00065 self.bond = bond = bondpy.Bond(TOPIC, id)
00066 bond.start()
00067
00068 bond_start_time = time.time()
00069
00070 formed = bond.wait_until_formed(rospy.Duration(2.0))
00071 if not formed:
00072 s = ''
00073 s += "BONDFAIL\n"
00074 s += "wait_until_formed returned %s\n" % formed
00075 s += "Bond details:\n"
00076 s += " Started = %s\n" % bond._Bond__started
00077 s += " sister_instance_id = %s\n" % bond.sister_instance_id
00078 s += " is_shutdown = %s\n" % bond.is_shutdown
00079 s += " num connections = %d\n" % bond.pub.get_num_connections()
00080
00081 formed_later = bond.wait_until_formed(rospy.Duration(20.0))
00082 s += "Formed later: %s after %.3f seconds\n" % (
00083 formed_later, time.time() - bond_start_time)
00084
00085 rospy.logerr(s)
00086 self.fail(s)
00087
00088
00089 self.assertTrue(bond.wait_until_broken(rospy.Duration(5.0)))
00090
00091
00092 def test_no_connect(self):
00093 id = gen_id()
00094
00095 self.bond = bond = bondpy.Bond(TOPIC, id)
00096 bond.start()
00097 self.assertFalse(bond.wait_until_formed(rospy.Duration(1.0)))
00098 self.assertTrue(bond.wait_until_broken(rospy.Duration(20.0)))
00099
00100
00101 def test_heartbeat_timeout(self):
00102 id = gen_id()
00103 test_bond.wait_for_service()
00104 test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(2.0), inhibit_death_message=True)
00105 self.bond = bond = bondpy.Bond(TOPIC, id)
00106 bond.start()
00107
00108 self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
00109 self.assertTrue(bond.wait_until_broken(rospy.Duration(10.0)))
00110
00111
00112 def test_i_die(self):
00113 id = gen_id()
00114 test_bond.wait_for_service()
00115 test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(-1))
00116 self.bond = bond = bondpy.Bond(TOPIC, id)
00117 bond.start()
00118
00119 self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
00120
00121 bond.break_bond()
00122 self.assertTrue(bond.wait_until_broken(rospy.Duration(2.0)))
00123
00124
00125 def test_die_no_ack(self):
00126 id = gen_id()
00127 test_bond.wait_for_service()
00128 test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(-1), inhibit_death_message=True)
00129 self.bond = bond = bondpy.Bond(TOPIC, id)
00130 bond.start()
00131
00132 self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
00133
00134 bond.break_bond()
00135 self.assertTrue(bond.wait_until_broken(rospy.Duration(5.0)))
00136
00137
00138 def test_die_ignore_death(self):
00139 id = gen_id()
00140 test_bond.wait_for_service()
00141 test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(-1), inhibit_death=True)
00142 self.bond = bond = bondpy.Bond(TOPIC, id)
00143 bond.start()
00144
00145 self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
00146
00147 bond.break_bond()
00148 self.assertFalse(bond.wait_until_broken(rospy.Duration(1.0)))
00149 self.assertTrue(bond.wait_until_broken(rospy.Duration(10.0)))
00150
00151 def setUp(self):
00152 self.bond = None
00153
00154 def tearDown(self):
00155 if self.bond:
00156 self.bond.shutdown()
00157 self.bond = None
00158
00159
00160 def main():
00161 rospy.init_node('exercise_bondpy', anonymous=True, disable_signals=True)
00162 rostest.run(PKG, 'exercise_bondpy', Exerciser, sys.argv)
00163
00164
00165 if __name__ == '__main__':
00166 main()