test_flips.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # License: BSD
00004 #   https://raw.github.com/robotics-in-concert/rocon_multimaster/hydro_devel/rocon_gateway_tests/LICENSE
00005 #
00006 ##############################################################################
00007 # Imports
00008 ##############################################################################
00009 
00010 import rospy
00011 import sys
00012 import rocon_console.console as console
00013 from rocon_gateway import samples
00014 from rocon_gateway import GatewaySampleRuntimeError
00015 from rocon_gateway import Graph
00016 from rocon_gateway import GatewayError
00017 import unittest
00018 import rosunit
00019 
00020 
00021 ##############################################################################
00022 # Main
00023 ##############################################################################
00024 
00025 class TestFlips(unittest.TestCase):
00026 
00027     def setUp(self):
00028         rospy.init_node('test_pullments')
00029         self.graph = Graph()
00030 
00031     def test_flip_all(self):
00032         print("\n********************************************************************")
00033         print("* Flip All")
00034         print("********************************************************************")
00035         try:
00036             samples.flip_all()
00037         except GatewaySampleRuntimeError as e:
00038             self.fail("Runtime error caught when flipping all connections [%s]" % str(e))
00039         flipped_interface = self._wait_for_flipped_interface()
00040         #print("%s" % self.graph._local_gateway)
00041         self.assertEquals("2", str(len(flipped_interface)))
00042         for remote_rule in flipped_interface:
00043             self.assertEquals("/chatter", remote_rule.remote_rule.rule.name)
00044             # Should probably assert rule.type and rule.node here as well.
00045         # Revert state
00046         try:
00047             samples.flip_all(cancel=True)
00048         except GatewaySampleRuntimeError as e:
00049             self.fail("Runtime error caught when unflipping all connections [%s]" % str(e))
00050         self._assert_cleared_flipped_interface()
00051 
00052     def test_flip_tutorials(self):
00053         print("\n********************************************************************")
00054         print("* Flip Tutorials")
00055         print("********************************************************************")
00056         try:
00057             samples.flip_tutorials() 
00058         except GatewaySampleRuntimeError as e:
00059             self.fail("Runtime error caught when flipping tutorial connections.")
00060         flipped_interface = self._wait_for_flipped_interface()
00061         #print("%s" % self.graph._local_gateway)
00062         self.assertIn("/chatter", [remote_rule.remote_rule.rule.name for remote_rule in flipped_interface])
00063         try:
00064             samples.flip_tutorials(cancel=True) 
00065         except GatewaySampleRuntimeError as e:
00066             self.fail("Runtime error caught when unflipping tutorial connections.")
00067         self._assert_cleared_flipped_interface()
00068 
00069     def test_flip_regex_tutorials(self):
00070         print("\n********************************************************************")
00071         print("* Flip Regex Tutorials")
00072         print("********************************************************************")
00073         try:
00074             samples.flip_tutorials(regex_patterns=True) 
00075         except GatewaySampleRuntimeError as e:
00076             self.fail("Runtime error caught when flipping tutorial connections.")
00077         flipped_interface = self._wait_for_flipped_interface()
00078         print("%s" % self.graph._local_gateway)
00079         self.assertIn("/chatter", [remote_rule.remote_rule.rule.name for remote_rule in flipped_interface])
00080         try:
00081             samples.flip_tutorials(cancel=True, regex_patterns=True) 
00082         except GatewaySampleRuntimeError as e:
00083             self.fail("Runtime error caught when unflipping tutorial connections.")
00084         self._assert_cleared_flipped_interface()
00085         
00086     def tearDown(self):
00087         pass
00088 
00089     ##########################################################################
00090     # Utility methods
00091     ##########################################################################
00092 
00093     def _wait_for_flipped_interface(self):
00094         flipped_interface = None
00095         while not flipped_interface:
00096             self.graph.update()
00097             flipped_interface = self.graph._local_gateway.flipped_connections
00098             rospy.sleep(0.2)
00099         return flipped_interface
00100 
00101     def _assert_cleared_flipped_interface(self):
00102         start_time = rospy.Time.now()
00103         while True:
00104             self.graph.update()
00105             flipped_interface = self.graph._local_gateway.flipped_connections
00106             if flipped_interface:
00107                 result = "cleared"
00108                 break
00109             else:
00110                 rospy.sleep(0.2)
00111             if rospy.Time.now() - start_time > rospy.Duration(1.0):
00112                 result = "timed out waiting for flipped interface to clear"
00113                 break
00114         self.assertEqual("cleared", result)
00115 
00116 NAME = 'test_flips'
00117 if __name__ == '__main__':
00118     rosunit.unitrun('test_flips', NAME, TestFlips, sys.argv, coverage_packages=['rocon_gateway'])
00119         


rocon_gateway_tests
Author(s): Daniel Stonier, Jihoon Lee, Piyush Khandelwal
autogenerated on Sat Jun 8 2019 18:48:48