00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 import sys
00032 import threading
00033 import time
00034 import uuid
00035
00036 PKG = 'test_bond'
00037 import roslib; roslib.load_manifest(PKG)
00038 import rospy
00039 import bondpy
00040 from test_bond.srv import *
00041
00042 import unittest
00043 import rostest
00044
00045 import atexit
00046 atexit.register(rospy.signal_shutdown, 'exit')
00047
00048 def gen_id():
00049 return "test_" + str(uuid.uuid4())
00050 TOPIC = "test_bond_topic"
00051
00052 test_bond = rospy.ServiceProxy('test_bond', TestBond)
00053
00054 class Exerciser(unittest.TestCase):
00055
00056
00057 def test_normal(self):
00058 id = gen_id()
00059 test_bond.wait_for_service()
00060 test_bond(id = id, topic = TOPIC, delay_death = rospy.Duration(2.0))
00061 bond = bondpy.Bond(TOPIC, id)
00062 bond.start()
00063
00064 self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
00065 self.assertTrue(bond.wait_until_broken(rospy.Duration(5.0)))
00066
00067
00068 def test_no_connect(self):
00069 id = gen_id()
00070
00071 bond = bondpy.Bond(TOPIC, id)
00072 bond.start()
00073 self.assertFalse(bond.wait_until_formed(rospy.Duration(1.0)))
00074 self.assertTrue(bond.wait_until_broken(rospy.Duration(20.0)))
00075
00076
00077 def test_heartbeat_timeout(self):
00078 id = gen_id()
00079 test_bond.wait_for_service()
00080 test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(2.0), inhibit_death_message=True)
00081 bond = bondpy.Bond(TOPIC, id)
00082 bond.start()
00083
00084 self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
00085 self.assertTrue(bond.wait_until_broken(rospy.Duration(10.0)))
00086
00087
00088 def test_i_die(self):
00089 id = gen_id()
00090 test_bond.wait_for_service()
00091 test_bond(id = id, topic = TOPIC, delay_death = rospy.Duration(-1))
00092 bond = bondpy.Bond(TOPIC, id)
00093 bond.start()
00094
00095 self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
00096
00097 bond.break_bond()
00098 self.assertTrue(bond.wait_until_broken(rospy.Duration(2.0)))
00099
00100
00101 def test_die_no_ack(self):
00102 id = gen_id()
00103 test_bond.wait_for_service()
00104 test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(-1), inhibit_death_message=True)
00105 bond = bondpy.Bond(TOPIC, id)
00106 bond.start()
00107
00108 self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
00109
00110 bond.break_bond()
00111 self.assertTrue(bond.wait_until_broken(rospy.Duration(5.0)))
00112
00113
00114 def test_die_ignore_death(self):
00115 id = gen_id()
00116 test_bond.wait_for_service()
00117 test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(-1), inhibit_death=True)
00118 bond = bondpy.Bond(TOPIC, id)
00119 bond.start()
00120
00121 self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
00122
00123 bond.break_bond()
00124 self.assertFalse(bond.wait_until_broken(rospy.Duration(1.0)))
00125 self.assertTrue(bond.wait_until_broken(rospy.Duration(10.0)))
00126
00127
00128 def main():
00129 rospy.init_node('exercise_bondpy', anonymous=True, disable_signals=True)
00130 rostest.run(PKG, 'exercise_bondpy', Exerciser, sys.argv)
00131
00132 if __name__ == '__main__': main()