test_pulls.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # License: BSD
00004 #   https://raw.github.com/robotics-in-concert/rocon_multimaster/hydro_devel/rocon_gateway_tests/LICENSE
00005 #
00006 ##############################################################################
00007 # Imports
00008 ##############################################################################
00009 
00010 import rospy
00011 import sys
00012 import rocon_console.console as console
00013 from rocon_gateway import samples
00014 from rocon_gateway import GatewaySampleRuntimeError
00015 from rocon_gateway import Graph
00016 from rocon_gateway import GatewayError
00017 import unittest
00018 import rosunit
00019 
00020 
00021 ##############################################################################
00022 # Main
00023 ##############################################################################
00024 
00025 class TestPulls(unittest.TestCase):
00026 
00027     def setUp(self):
00028         print("\n********************************************************************")
00029         print("* Pull Tests Setup")
00030         print("********************************************************************")
00031         rospy.init_node('test_pulls')
00032         self.graph = Graph()
00033 
00034     def test_pull_all(self):
00035         print("\n********************************************************************")
00036         print("* Pull All")
00037         print("********************************************************************")
00038         try:
00039             samples.pull_all()
00040         except GatewaySampleRuntimeError as e:
00041             self.fail("Runtime error caught when advertising all connections.")
00042         pulled_interface = self._wait_for_pulled_interface()
00043         #print("%s" % self.graph._local_gateway)
00044         self.assertEquals("2", str(len(pulled_interface)))
00045         for remote_rule in pulled_interface:
00046             self.assertEquals("/chatter", remote_rule.rule.name)
00047             # Should probably assert rule.type and rule.node here as well.
00048         # Revert state
00049         try:
00050             samples.pull_all(cancel=True)
00051         except GatewaySampleRuntimeError as e:
00052             self.fail("Runtime error caught when unadvertising all connections.")
00053         self._assert_cleared_pulled_interface()
00054 
00055     def test_pull_tutorials(self):
00056         print("\n********************************************************************")
00057         print("* Pull Tutorials")
00058         print("********************************************************************")
00059         try:
00060             samples.pull_tutorials() 
00061         except GatewaySampleRuntimeError as e:
00062             self.fail("Runtime error caught when advertising tutorial connections.")
00063         pulled_interface = self._wait_for_pulled_interface()
00064         #print("%s" % self.graph._local_gateway)
00065         self.assertIn("/chatter", [remote_rule.rule.name for remote_rule in pulled_interface])
00066         try:
00067             samples.pull_tutorials(cancel=True) 
00068         except GatewaySampleRuntimeError as e:
00069             self.fail("Runtime error caught when unadvertising tutorial connections.")
00070         self._assert_cleared_pulled_interface()
00071 
00072     def test_pull_regex_tutorials(self):
00073         print("\n********************************************************************")
00074         print("* Pull Regex Tutorials")
00075         print("********************************************************************")
00076         try:
00077             samples.pull_tutorials(regex_patterns=True) 
00078         except GatewaySampleRuntimeError as e:
00079             self.fail("Runtime error caught when advertising tutorial connections.")
00080         pulled_interface = self._wait_for_pulled_interface()
00081         print("%s" % self.graph._local_gateway)
00082         self.assertIn("/chatter", [remote_rule.rule.name for remote_rule in pulled_interface])
00083         try:
00084             samples.pull_tutorials(cancel=True, regex_patterns=True) 
00085         except GatewaySampleRuntimeError as e:
00086             self.fail("Runtime error caught when unadvertising tutorial connections.")
00087         self._assert_cleared_pulled_interface()
00088         
00089     def tearDown(self):
00090         pass
00091 
00092     ##########################################################################
00093     # Utility methods
00094     ##########################################################################
00095 
00096     def _wait_for_pulled_interface(self):
00097         pulled_interface = None
00098         while not pulled_interface:
00099             self.graph.update()
00100             pulled_interface = self.graph._local_gateway.pulled_connections
00101             rospy.sleep(1.0)
00102         return pulled_interface
00103 
00104     def _assert_cleared_pulled_interface(self):
00105         start_time = rospy.Time.now()
00106         while True:
00107             self.graph.update()
00108             pulled_interface = self.graph._local_gateway.pulled_connections
00109             if pulled_interface:
00110                 result = "cleared"
00111                 break
00112             else:
00113                 rospy.sleep(0.2)
00114             if rospy.Time.now() - start_time > rospy.Duration(1.0):
00115                 result = "timed out waiting for pulled interface to clear"
00116                 break
00117         self.assertEqual("cleared", result)
00118 
00119 NAME = 'test_pulls'
00120 if __name__ == '__main__':
00121     rosunit.unitrun('test_pulls', NAME, TestPulls, sys.argv, coverage_packages=['rocon_gateway'])
00122         


rocon_gateway_tests
Author(s): Daniel Stonier, Jihoon Lee, Piyush Khandelwal
autogenerated on Sat Jun 8 2019 18:48:48