exercise_bond.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 # Copyright (c) 2009, Willow Garage, Inc.
3 # All rights reserved.
4 #
5 # Redistribution and use in source and binary forms, with or without
6 # modification, are permitted provided that the following conditions are met:
7 #
8 # * Redistributions of source code must retain the above copyright
9 # notice, this list of conditions and the following disclaimer.
10 # * Redistributions in binary form must reproduce the above copyright
11 # notice, this list of conditions and the following disclaimer in the
12 # documentation and/or other materials provided with the distribution.
13 # * Neither the name of the Willow Garage, Inc. nor the names of its
14 # contributors may be used to endorse or promote products derived from
15 # this software without specific prior written permission.
16 #
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 
29 # Author: Stuart Glaser
30 
31 import sys
32 import time
33 import uuid
34 
35 PKG = 'test_bond'
36 import roslib; roslib.load_manifest(PKG)
37 import rospy
38 from bondpy import bondpy
39 from test_bond.srv import TestBond
40 
41 import unittest
42 import rostest
43 
44 import atexit
45 atexit.register(rospy.signal_shutdown, 'exit')
46 
47 
48 def gen_id():
49  return "test_" + str(uuid.uuid4())
50 
51 
52 TOPIC = "test_bond_topic"
53 
54 test_bond = rospy.ServiceProxy('test_bond', TestBond)
55 
56 
57 class Exerciser(unittest.TestCase):
58 
59  # Bond formed, bond broken remotely
60  def test_normal(self):
61  id = gen_id()
62  test_bond.wait_for_service()
63  test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(2.0))
64  rospy.logerr("test_normal: test_bond service call just returned")
65  self.bond = bond = bondpy.Bond(TOPIC, id)
66  bond.start()
67 
68  bond_start_time = time.time()
69 
70  formed = bond.wait_until_formed(rospy.Duration(2.0))
71  if not formed:
72  s = ''
73  s += "BONDFAIL\n"
74  s += "wait_until_formed returned %s\n" % formed
75  s += "Bond details:\n"
76  s += " Started = %s\n" % bond._Bond__started
77  s += " sister_instance_id = %s\n" % bond.sister_instance_id
78  s += " is_shutdown = %s\n" % bond.is_shutdown
79  s += " num connections = %d\n" % bond.pub.get_num_connections()
80 
81  formed_later = bond.wait_until_formed(rospy.Duration(20.0))
82  s += "Formed later: %s after %.3f seconds\n" % (
83  formed_later, time.time() - bond_start_time)
84 
85  rospy.logerr(s)
86  self.fail(s)
87 
88  # self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
89  self.assertTrue(bond.wait_until_broken(rospy.Duration(5.0)))
90 
91  # Remote never connects
92  def test_no_connect(self):
93  id = gen_id()
94  # Don't start the other side of the bond
95  self.bond = bond = bondpy.Bond(TOPIC, id)
96  bond.start()
97  self.assertFalse(bond.wait_until_formed(rospy.Duration(1.0)))
98  self.assertTrue(bond.wait_until_broken(rospy.Duration(20.0)))
99 
100  # Remote dies (but doesn't report it)
102  id = gen_id()
103  test_bond.wait_for_service()
104  test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(2.0), inhibit_death_message=True)
105  self.bond = bond = bondpy.Bond(TOPIC, id)
106  bond.start()
107 
108  self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
109  self.assertTrue(bond.wait_until_broken(rospy.Duration(10.0)))
110 
111  # Local dies, remote acks normally
112  def test_i_die(self):
113  id = gen_id()
114  test_bond.wait_for_service()
115  test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(-1))
116  self.bond = bond = bondpy.Bond(TOPIC, id)
117  bond.start()
118 
119  self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
120 
121  bond.break_bond()
122  self.assertTrue(bond.wait_until_broken(rospy.Duration(2.0)))
123 
124  # Local dies, remote goes silent
125  def test_die_no_ack(self):
126  id = gen_id()
127  test_bond.wait_for_service()
128  test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(-1), inhibit_death_message=True)
129  self.bond = bond = bondpy.Bond(TOPIC, id)
130  bond.start()
131 
132  self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
133 
134  bond.break_bond()
135  self.assertTrue(bond.wait_until_broken(rospy.Duration(5.0)))
136 
137  # Local dies, remote stays alive
139  id = gen_id()
140  test_bond.wait_for_service()
141  test_bond(id=id, topic=TOPIC, delay_death=rospy.Duration(-1), inhibit_death=True)
142  self.bond = bond = bondpy.Bond(TOPIC, id)
143  bond.start()
144 
145  self.assertTrue(bond.wait_until_formed(rospy.Duration(2.0)))
146 
147  bond.break_bond()
148  self.assertFalse(bond.wait_until_broken(rospy.Duration(1.0)))
149  self.assertTrue(bond.wait_until_broken(rospy.Duration(10.0)))
150 
151  # Shutdown() before start()
153  id = gen_id()
154  self.bond = bondpy.Bond(TOPIC, id)
155 
156  self.bond.shutdown()
157  self.bond = None
158 
159  def setUp(self):
160  self.bond = None
161 
162  def tearDown(self):
163  if self.bond:
164  self.bond.shutdown()
165  self.bond = None
166 
167 
168 def main():
169  rospy.init_node('exercise_bondpy', anonymous=True, disable_signals=True)
170  rostest.run(PKG, 'exercise_bondpy', Exerciser, sys.argv)
171 
172 
173 if __name__ == '__main__':
174  main()
def test_shutdown_before_start(self)
def test_heartbeat_timeout(self)


test_bond
Author(s): Stuart Glaser
autogenerated on Wed Sep 2 2020 03:40:47