Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 import rospy
00011 import sys
00012 import rocon_console.console as console
00013 from rocon_gateway import samples
00014 from rocon_gateway import GatewaySampleRuntimeError
00015 from rocon_gateway import Graph
00016 from rocon_gateway import GatewayError
00017 import unittest
00018 import rosunit
00019
00020
00021
00022
00023
00024
00025 class TestPulls(unittest.TestCase):
00026
00027 def setUp(self):
00028 print("\n********************************************************************")
00029 print("* Pull Tests Setup")
00030 print("********************************************************************")
00031 rospy.init_node('test_pulls')
00032 self.graph = Graph()
00033
00034 def test_pull_all(self):
00035 print("\n********************************************************************")
00036 print("* Pull All")
00037 print("********************************************************************")
00038 try:
00039 samples.pull_all()
00040 except GatewaySampleRuntimeError as e:
00041 self.fail("Runtime error caught when advertising all connections.")
00042 pulled_interface = self._wait_for_pulled_interface()
00043
00044 self.assertEquals("2", str(len(pulled_interface)))
00045 for remote_rule in pulled_interface:
00046 self.assertEquals("/chatter", remote_rule.rule.name)
00047
00048
00049 try:
00050 samples.pull_all(cancel=True)
00051 except GatewaySampleRuntimeError as e:
00052 self.fail("Runtime error caught when unadvertising all connections.")
00053 self._assert_cleared_pulled_interface()
00054
00055 def test_pull_tutorials(self):
00056 print("\n********************************************************************")
00057 print("* Pull Tutorials")
00058 print("********************************************************************")
00059 try:
00060 samples.pull_tutorials()
00061 except GatewaySampleRuntimeError as e:
00062 self.fail("Runtime error caught when advertising tutorial connections.")
00063 pulled_interface = self._wait_for_pulled_interface()
00064
00065 self.assertIn("/chatter", [remote_rule.rule.name for remote_rule in pulled_interface])
00066 try:
00067 samples.pull_tutorials(cancel=True)
00068 except GatewaySampleRuntimeError as e:
00069 self.fail("Runtime error caught when unadvertising tutorial connections.")
00070 self._assert_cleared_pulled_interface()
00071
00072 def test_pull_regex_tutorials(self):
00073 print("\n********************************************************************")
00074 print("* Pull Regex Tutorials")
00075 print("********************************************************************")
00076 try:
00077 samples.pull_tutorials(regex_patterns=True)
00078 except GatewaySampleRuntimeError as e:
00079 self.fail("Runtime error caught when advertising tutorial connections.")
00080 pulled_interface = self._wait_for_pulled_interface()
00081 print("%s" % self.graph._local_gateway)
00082 self.assertIn("/chatter", [remote_rule.rule.name for remote_rule in pulled_interface])
00083 try:
00084 samples.pull_tutorials(cancel=True, regex_patterns=True)
00085 except GatewaySampleRuntimeError as e:
00086 self.fail("Runtime error caught when unadvertising tutorial connections.")
00087 self._assert_cleared_pulled_interface()
00088
00089 def tearDown(self):
00090 pass
00091
00092
00093
00094
00095
00096 def _wait_for_pulled_interface(self):
00097 pulled_interface = None
00098 while not pulled_interface:
00099 self.graph.update()
00100 pulled_interface = self.graph._local_gateway.pulled_connections
00101 rospy.sleep(1.0)
00102 return pulled_interface
00103
00104 def _assert_cleared_pulled_interface(self):
00105 start_time = rospy.Time.now()
00106 while True:
00107 self.graph.update()
00108 pulled_interface = self.graph._local_gateway.pulled_connections
00109 if pulled_interface:
00110 result = "cleared"
00111 break
00112 else:
00113 rospy.sleep(0.2)
00114 if rospy.Time.now() - start_time > rospy.Duration(1.0):
00115 result = "timed out waiting for pulled interface to clear"
00116 break
00117 self.assertEqual("cleared", result)
00118
00119 NAME = 'test_pulls'
00120 if __name__ == '__main__':
00121 rosunit.unitrun('test_pulls', NAME, TestPulls, sys.argv, coverage_packages=['rocon_gateway'])
00122