Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 import sys
00032 import threading
00033 import time
00034 import uuid
00035
00036 PKG = 'test_bond'
00037 import roslib; roslib.load_manifest(PKG)
00038 import rospy
00039 import bondpy
00040 from test_bond.srv import *
00041
00042 import unittest
00043 import rostest
00044
00045 import atexit
00046 atexit.register(rospy.signal_shutdown, 'exit')
00047
00048 def gen_id():
00049 return "test_" + str(uuid.uuid4())
00050 TOPIC = "test_bond_topic"
00051
00052 test_bond = rospy.ServiceProxy('test_bond', TestBond)
00053
00054 class Exerciser(unittest.TestCase):
00055
00056
00057 def test_normal(self):
00058 id = gen_id()
00059 test_bond.wait_for_service()
00060 test_bond(id = id, topic = TOPIC, delay_death = rospy.Duration(2.0))
00061 rospy.logerr("test_normal: test_bond service call just returned")
00062 self.bond = bond = bondpy.Bond(TOPIC, id)
00063 bond.start()
00064
00065 bond_start_time = time.time()
00066
00067 formed = bond.wait_until_formed(rospy.Duration(2.0))
00068 if not formed:
00069 s = ''
00070 s += "BONDFAIL\n"
00071 s += "wait_until_formed returned %s\n" % formed
00072 s += "Bond details:\n"
00073 s += " Started = %s\n" % bond._Bond__started
00074 s += " sister_instance_id = %s\n" % bond.sister_instance_id
00075 s += " is_shutdown = %s\n" % bond.is_shutdown
00076 s += " num connections = %d\n" % bond.pub.get_num_connections()
00077
00078 formed_later = bond.wait_until_formed(rospy.Duration(20.0))
00079 s += "Formed later: %s after %.3f seconds\n" % (formed_later, time.time() - bond_start_time)
00080
00081 rospy.logerr(s)
00082 self.fail(s)
00083
00084
00085 self.assertTrue(bond.wait_until_broken(rospy.Duration(5.0)))
00086
00087
00088 def test_no_connect(self):
00089 id = gen_id()
00090
00091 self.bond = bond = bondpy.Bond(TOPIC, id)
00092 bond.start()
00093 self.assertFalse(bond.wait_until_formed(rospy.Duration(1.0)))
00094 self.assertTrue(bond.wait_until_broken(rospy.Duration(20.0)))
00095
00096
00097 def test_heartbeat_timeout(self):
00098 id = gen_id()
00099 test_bond.wait_for_service()
00100 test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(2.0), inhibit_death_message=True)
00101 self.bond = bond = bondpy.Bond(TOPIC, id)
00102 bond.start()
00103
00104 self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
00105 self.assertTrue(bond.wait_until_broken(rospy.Duration(10.0)))
00106
00107
00108 def test_i_die(self):
00109 id = gen_id()
00110 test_bond.wait_for_service()
00111 test_bond(id = id, topic = TOPIC, delay_death = rospy.Duration(-1))
00112 self.bond = bond = bondpy.Bond(TOPIC, id)
00113 bond.start()
00114
00115 self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
00116
00117 bond.break_bond()
00118 self.assertTrue(bond.wait_until_broken(rospy.Duration(2.0)))
00119
00120
00121 def test_die_no_ack(self):
00122 id = gen_id()
00123 test_bond.wait_for_service()
00124 test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(-1), inhibit_death_message=True)
00125 self.bond = bond = bondpy.Bond(TOPIC, id)
00126 bond.start()
00127
00128 self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
00129
00130 bond.break_bond()
00131 self.assertTrue(bond.wait_until_broken(rospy.Duration(5.0)))
00132
00133
00134 def test_die_ignore_death(self):
00135 id = gen_id()
00136 test_bond.wait_for_service()
00137 test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(-1), inhibit_death=True)
00138 self.bond = bond = bondpy.Bond(TOPIC, id)
00139 bond.start()
00140
00141 self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
00142
00143 bond.break_bond()
00144 self.assertFalse(bond.wait_until_broken(rospy.Duration(1.0)))
00145 self.assertTrue(bond.wait_until_broken(rospy.Duration(10.0)))
00146
00147 def setUp(self):
00148 self.bond = None
00149 def tearDown(self):
00150 if self.bond:
00151 self.bond.shutdown()
00152 self.bond = None
00153
00154
00155 def main():
00156 rospy.init_node('exercise_bondpy', anonymous=True, disable_signals=True)
00157 rostest.run(PKG, 'exercise_bondpy', Exerciser, sys.argv)
00158
00159 if __name__ == '__main__': main()