Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 import rospy
00011 import sys
00012 import rocon_console.console as console
00013 from rocon_gateway import samples
00014 from rocon_gateway import GatewaySampleRuntimeError
00015 from rocon_gateway import Graph
00016 from rocon_gateway import GatewayError
00017 import unittest
00018 import rosunit
00019
00020
00021
00022
00023
00024
00025 class TestFlips(unittest.TestCase):
00026
00027 def setUp(self):
00028 rospy.init_node('test_pullments')
00029 self.graph = Graph()
00030
00031 def test_flip_all(self):
00032 print("\n********************************************************************")
00033 print("* Flip All")
00034 print("********************************************************************")
00035 try:
00036 samples.flip_all()
00037 except GatewaySampleRuntimeError as e:
00038 self.fail("Runtime error caught when flipping all connections [%s]" % str(e))
00039 flipped_interface = self._wait_for_flipped_interface()
00040
00041 self.assertEquals("2", str(len(flipped_interface)))
00042 for remote_rule in flipped_interface:
00043 self.assertEquals("/chatter", remote_rule.remote_rule.rule.name)
00044
00045
00046 try:
00047 samples.flip_all(cancel=True)
00048 except GatewaySampleRuntimeError as e:
00049 self.fail("Runtime error caught when unflipping all connections [%s]" % str(e))
00050 self._assert_cleared_flipped_interface()
00051
00052 def test_flip_tutorials(self):
00053 print("\n********************************************************************")
00054 print("* Flip Tutorials")
00055 print("********************************************************************")
00056 try:
00057 samples.flip_tutorials()
00058 except GatewaySampleRuntimeError as e:
00059 self.fail("Runtime error caught when flipping tutorial connections.")
00060 flipped_interface = self._wait_for_flipped_interface()
00061
00062 self.assertIn("/chatter", [remote_rule.remote_rule.rule.name for remote_rule in flipped_interface])
00063 try:
00064 samples.flip_tutorials(cancel=True)
00065 except GatewaySampleRuntimeError as e:
00066 self.fail("Runtime error caught when unflipping tutorial connections.")
00067 self._assert_cleared_flipped_interface()
00068
00069 def test_flip_regex_tutorials(self):
00070 print("\n********************************************************************")
00071 print("* Flip Regex Tutorials")
00072 print("********************************************************************")
00073 try:
00074 samples.flip_tutorials(regex_patterns=True)
00075 except GatewaySampleRuntimeError as e:
00076 self.fail("Runtime error caught when flipping tutorial connections.")
00077 flipped_interface = self._wait_for_flipped_interface()
00078 print("%s" % self.graph._local_gateway)
00079 self.assertIn("/chatter", [remote_rule.remote_rule.rule.name for remote_rule in flipped_interface])
00080 try:
00081 samples.flip_tutorials(cancel=True, regex_patterns=True)
00082 except GatewaySampleRuntimeError as e:
00083 self.fail("Runtime error caught when unflipping tutorial connections.")
00084 self._assert_cleared_flipped_interface()
00085
00086 def tearDown(self):
00087 pass
00088
00089
00090
00091
00092
00093 def _wait_for_flipped_interface(self):
00094 flipped_interface = None
00095 while not flipped_interface:
00096 self.graph.update()
00097 flipped_interface = self.graph._local_gateway.flipped_connections
00098 rospy.sleep(0.2)
00099 return flipped_interface
00100
00101 def _assert_cleared_flipped_interface(self):
00102 start_time = rospy.Time.now()
00103 while True:
00104 self.graph.update()
00105 flipped_interface = self.graph._local_gateway.flipped_connections
00106 if flipped_interface:
00107 result = "cleared"
00108 break
00109 else:
00110 rospy.sleep(0.2)
00111 if rospy.Time.now() - start_time > rospy.Duration(1.0):
00112 result = "timed out waiting for flipped interface to clear"
00113 break
00114 self.assertEqual("cleared", result)
00115
00116 NAME = 'test_flips'
00117 if __name__ == '__main__':
00118 rosunit.unitrun('test_flips', NAME, TestFlips, sys.argv, coverage_packages=['rocon_gateway'])
00119