36 import roslib; roslib.load_manifest(PKG)
38 from bondpy
import bondpy
39 from test_bond.srv
import TestBond
45 atexit.register(rospy.signal_shutdown,
'exit')
49 return "test_" + str(uuid.uuid4())
52 TOPIC =
"test_bond_topic" 54 test_bond = rospy.ServiceProxy(
'test_bond', TestBond)
62 test_bond.wait_for_service()
63 test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(2.0))
64 rospy.logerr(
"test_normal: test_bond service call just returned")
65 self.
bond = bond = bondpy.Bond(TOPIC, id)
68 bond_start_time = time.time()
70 formed = bond.wait_until_formed(rospy.Duration(2.0))
74 s +=
"wait_until_formed returned %s\n" % formed
75 s +=
"Bond details:\n" 76 s +=
" Started = %s\n" % bond._Bond__started
77 s +=
" sister_instance_id = %s\n" % bond.sister_instance_id
78 s +=
" is_shutdown = %s\n" % bond.is_shutdown
79 s +=
" num connections = %d\n" % bond.pub.get_num_connections()
81 formed_later = bond.wait_until_formed(rospy.Duration(20.0))
82 s +=
"Formed later: %s after %.3f seconds\n" % (
83 formed_later, time.time() - bond_start_time)
89 self.assertTrue(bond.wait_until_broken(rospy.Duration(5.0)))
95 self.
bond = bond = bondpy.Bond(TOPIC, id)
97 self.assertFalse(bond.wait_until_formed(rospy.Duration(1.0)))
98 self.assertTrue(bond.wait_until_broken(rospy.Duration(20.0)))
103 test_bond.wait_for_service()
104 test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(2.0), inhibit_death_message=
True)
105 self.
bond = bond = bondpy.Bond(TOPIC, id)
108 self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
109 self.assertTrue(bond.wait_until_broken(rospy.Duration(10.0)))
114 test_bond.wait_for_service()
115 test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(-1))
116 self.
bond = bond = bondpy.Bond(TOPIC, id)
119 self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
122 self.assertTrue(bond.wait_until_broken(rospy.Duration(2.0)))
127 test_bond.wait_for_service()
128 test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(-1), inhibit_death_message=
True)
129 self.
bond = bond = bondpy.Bond(TOPIC, id)
132 self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
135 self.assertTrue(bond.wait_until_broken(rospy.Duration(5.0)))
140 test_bond.wait_for_service()
141 test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(-1), inhibit_death=
True)
142 self.
bond = bond = bondpy.Bond(TOPIC, id)
145 self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
148 self.assertFalse(bond.wait_until_broken(rospy.Duration(1.0)))
149 self.assertTrue(bond.wait_until_broken(rospy.Duration(10.0)))
154 self.
bond = bondpy.Bond(TOPIC, id)
169 rospy.init_node(
'exercise_bondpy', anonymous=
True, disable_signals=
True)
170 rostest.run(PKG,
'exercise_bondpy', Exerciser, sys.argv)
173 if __name__ ==
'__main__':
def test_die_ignore_death(self)
def test_no_connect(self)
def test_die_no_ack(self)
def test_shutdown_before_start(self)
def test_heartbeat_timeout(self)