Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 import sys
00032 import threading
00033 import time
00034 import uuid
00035 
00036 PKG = 'test_bond'
00037 import roslib; roslib.load_manifest(PKG)
00038 import rospy
00039 import bondpy
00040 from test_bond.srv import *
00041 
00042 import unittest
00043 import rostest
00044 
00045 import atexit
00046 atexit.register(rospy.signal_shutdown, 'exit')
00047 
00048 def gen_id():
00049     return "test_" + str(uuid.uuid4())
00050 TOPIC = "test_bond_topic"
00051 
00052 test_bond = rospy.ServiceProxy('test_bond', TestBond)
00053 
00054 class Exerciser(unittest.TestCase):
00055 
00056     
00057     def test_normal(self):
00058         id = gen_id()
00059         test_bond.wait_for_service()
00060         test_bond(id = id, topic = TOPIC, delay_death = rospy.Duration(2.0))
00061         rospy.logerr("test_normal: test_bond service call just returned")
00062         self.bond = bond = bondpy.Bond(TOPIC, id)
00063         bond.start()
00064         
00065         bond_start_time = time.time()
00066 
00067         formed = bond.wait_until_formed(rospy.Duration(2.0))
00068         if not formed:
00069             s = ''
00070             s += "BONDFAIL\n"
00071             s += "wait_until_formed returned %s\n" % formed
00072             s += "Bond details:\n"
00073             s += "  Started = %s\n" % bond._Bond__started
00074             s += "  sister_instance_id = %s\n" % bond.sister_instance_id
00075             s += "  is_shutdown = %s\n" % bond.is_shutdown
00076             s += "  num connections = %d\n" % bond.pub.get_num_connections()
00077 
00078             formed_later = bond.wait_until_formed(rospy.Duration(20.0))
00079             s += "Formed later: %s after %.3f seconds\n" % (formed_later, time.time() - bond_start_time)
00080 
00081             rospy.logerr(s)
00082             self.fail(s)
00083 
00084         
00085         self.assertTrue(bond.wait_until_broken(rospy.Duration(5.0)))
00086 
00087     
00088     def test_no_connect(self):
00089         id = gen_id()
00090         
00091         self.bond = bond = bondpy.Bond(TOPIC, id)
00092         bond.start()
00093         self.assertFalse(bond.wait_until_formed(rospy.Duration(1.0)))
00094         self.assertTrue(bond.wait_until_broken(rospy.Duration(20.0)))
00095 
00096     
00097     def test_heartbeat_timeout(self):
00098         id = gen_id()
00099         test_bond.wait_for_service()
00100         test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(2.0), inhibit_death_message=True)
00101         self.bond = bond = bondpy.Bond(TOPIC, id)
00102         bond.start()
00103 
00104         self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
00105         self.assertTrue(bond.wait_until_broken(rospy.Duration(10.0)))
00106 
00107     
00108     def test_i_die(self):
00109         id = gen_id()
00110         test_bond.wait_for_service()
00111         test_bond(id = id, topic = TOPIC, delay_death = rospy.Duration(-1))
00112         self.bond = bond = bondpy.Bond(TOPIC, id)
00113         bond.start()
00114 
00115         self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
00116 
00117         bond.break_bond()
00118         self.assertTrue(bond.wait_until_broken(rospy.Duration(2.0)))
00119 
00120     
00121     def test_die_no_ack(self):
00122         id = gen_id()
00123         test_bond.wait_for_service()
00124         test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(-1), inhibit_death_message=True)
00125         self.bond = bond = bondpy.Bond(TOPIC, id)
00126         bond.start()
00127 
00128         self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
00129 
00130         bond.break_bond()
00131         self.assertTrue(bond.wait_until_broken(rospy.Duration(5.0)))
00132 
00133     
00134     def test_die_ignore_death(self):
00135         id = gen_id()
00136         test_bond.wait_for_service()
00137         test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(-1), inhibit_death=True)
00138         self.bond = bond = bondpy.Bond(TOPIC, id)
00139         bond.start()
00140 
00141         self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
00142 
00143         bond.break_bond()
00144         self.assertFalse(bond.wait_until_broken(rospy.Duration(1.0)))
00145         self.assertTrue(bond.wait_until_broken(rospy.Duration(10.0)))
00146 
00147     def setUp(self):
00148         self.bond = None
00149     def tearDown(self):
00150         if self.bond:
00151             self.bond.shutdown()
00152             self.bond = None
00153 
00154 
00155 def main():
00156     rospy.init_node('exercise_bondpy', anonymous=True, disable_signals=True)
00157     rostest.run(PKG, 'exercise_bondpy', Exerciser, sys.argv)
00158 
00159 if __name__ == '__main__': main()