test_advertisements.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # License: BSD
00004 #   https://raw.github.com/robotics-in-concert/rocon_multimaster/hydro_devel/rocon_gateway_tests/LICENSE
00005 #
00006 ##############################################################################
00007 # Imports
00008 ##############################################################################
00009 
00010 import rospy
00011 import sys
00012 import rocon_console.console as console
00013 from rocon_gateway import samples
00014 from rocon_gateway import GatewaySampleRuntimeError
00015 from rocon_gateway import Graph
00016 from rocon_gateway import GatewayError
00017 import unittest
00018 import rosunit
00019 
00020 
00021 
00022 ##############################################################################
00023 # Main
00024 ##############################################################################
00025 
00026 class TestGraph(unittest.TestCase):
00027 
00028     def setUp(self):
00029         rospy.init_node('test_advertisements')
00030         self.graph = Graph()
00031 
00032     def test_advertise_all(self):
00033         print("\n********************************************************************")
00034         print("* Advertise All")
00035         print("********************************************************************")
00036         try:
00037             samples.advertise_all()
00038         except GatewaySampleRuntimeError as e:
00039             self.fail("Runtime error caught when advertising all connections.")
00040         public_interface = self._wait_for_public_interface()
00041         #print("%s" % self.graph._local_gateway)
00042         self.assertEquals("2", str(len(public_interface)))
00043         for rule in public_interface:
00044             self.assertEquals("/chatter", rule.name)
00045             # Should probably assert rule.type and rule.node here as well.
00046         # Revert state
00047         try:
00048             samples.advertise_all(cancel=True)
00049         except GatewaySampleRuntimeError as e:
00050             self.fail("Runtime error caught when unadvertising all connections.")
00051         self._assert_cleared_public_interface()
00052 
00053     def test_advertise_tutorials(self):
00054         print("\n********************************************************************")
00055         print("* Advertise Tutorials")
00056         print("********************************************************************")
00057         try:
00058             samples.advertise_tutorials() 
00059         except GatewaySampleRuntimeError as e:
00060             self.fail("Runtime error caught when advertising tutorial connections.")
00061         public_interface = self._wait_for_public_interface()
00062         #print("%s" % self.graph._local_gateway)
00063         self.assertIn("/chatter", [rule.name for rule in public_interface])
00064         try:
00065             samples.advertise_tutorials(cancel=True) 
00066         except GatewaySampleRuntimeError as e:
00067             self.fail("Runtime error caught when unadvertising tutorial connections.")
00068         self._assert_cleared_public_interface()
00069 
00070     def test_advertise_regex_tutorials(self):
00071         print("\n********************************************************************")
00072         print("* Advertise Regex Tutorials")
00073         print("********************************************************************")
00074         try:
00075             samples.advertise_tutorials(regex_patterns=True) 
00076         except GatewaySampleRuntimeError as e:
00077             self.fail("Runtime error caught when advertising tutorial connections.")
00078         public_interface = self._wait_for_public_interface()
00079         print("%s" % self.graph._local_gateway)
00080         self.assertIn("/chatter", [rule.name for rule in public_interface])
00081         try:
00082             samples.advertise_tutorials(cancel=True, regex_patterns=True) 
00083         except GatewaySampleRuntimeError as e:
00084             self.fail("Runtime error caught when unadvertising tutorial connections.")
00085         self._assert_cleared_public_interface()
00086         
00087     def tearDown(self):
00088         pass
00089 
00090     ##########################################################################
00091     # Utility methods
00092     ##########################################################################
00093 
00094     def _wait_for_public_interface(self):
00095         public_interface = None
00096         while not public_interface:
00097             self.graph.update()
00098             public_interface = self.graph._local_gateway.public_interface
00099             rospy.rostime.wallsleep(0.2)
00100         return public_interface
00101 
00102     def _assert_cleared_public_interface(self):
00103         start_time = rospy.Time.now()
00104         while True:
00105             self.graph.update()
00106             public_interface = self.graph._local_gateway.public_interface
00107             if public_interface:
00108                 result = "cleared"
00109                 break
00110             else:
00111                 rospy.rostime.wallsleep(0.2)
00112             if rospy.Time.now() - start_time > rospy.Duration(2.0):
00113                 result = "timed out waiting for public interface to clear"
00114                 break
00115         self.assertEqual("cleared", result)
00116 
00117 NAME = 'test_graph'
00118 if __name__ == '__main__':
00119     rosunit.unitrun('test_graph', NAME, TestGraph, sys.argv, coverage_packages=['rocon_gateway'])
00120         


rocon_gateway_tests
Author(s): Daniel Stonier, Jihoon Lee, Piyush Khandelwal
autogenerated on Sat Jun 8 2019 18:48:48