test_advertisements_locally.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #       
00003 # License: BSD
00004 #   https://raw.github.com/robotics-in-concert/rocon_multimaster/master/rocon_gateway_tests/LICENSE 
00005 #
00006 
00007 PKG = 'rocon_gateway_tests'
00008 import roslib; roslib.load_manifest(PKG)
00009 import rospy
00010 import rostest
00011 from gateway_msgs.msg import *
00012 from gateway_msgs.srv import *
00013 import unittest
00014 import std_msgs
00015 import copy
00016 
00017 class TestAdvertisementsLocally(unittest.TestCase):
00018 
00019     def assertAdvertiseAllCall(self, blacklist, expected_interface):
00020         req = AdvertiseAllRequest()
00021         req.cancel = False
00022         req.blacklist = copy.deepcopy(blacklist)
00023 
00024         # make call and ensure it succeeded
00025         resp = self.advertiseAll(req)
00026         self.assertEquals(resp.result, Result.SUCCESS)
00027 
00028         # ensure expected interfaces came up
00029         self.assertPublicInterface(expected_interface)
00030 
00031         # remove everything
00032         req.cancel = True
00033         resp = self.advertiseAll(req)
00034         self.assertEquals(resp.result, Result.SUCCESS)
00035 
00036         self.assertPublicInterface([])
00037 
00038     def assertAdvertiseCall(self, watchlist, expected_interface):
00039 
00040         # Make the call
00041         req = AdvertiseRequest()
00042         req.rules = copy.deepcopy(watchlist)
00043         req.cancel = False
00044 
00045         # ensure call succeeded
00046         resp = self.advertise(req)
00047         self.assertEquals(resp.result, Result.SUCCESS)
00048         self.assertEquals(len(resp.watchlist), len(watchlist))
00049         for rule in resp.watchlist:
00050             self.assertIn(rule, watchlist)
00051 
00052         # ensure public interface comes up as expected
00053         self.assertPublicInterface(expected_interface)
00054 
00055         # remove everything and check if everything has been removed
00056         req.cancel = True
00057         resp = self.advertise(req)
00058         self.assertEquals(resp.result, Result.SUCCESS)
00059         self.assertEquals(len(resp.watchlist), 0)
00060         self.assertPublicInterface([])
00061 
00062     def assertPublicInterface(self, rules):
00063 
00064         expected_rules = copy.deepcopy(rules)
00065         num_nodes = len(expected_rules)
00066         while True:
00067             rospy.sleep(1.0)
00068             resp = self.gatewayInfo()
00069             node_names = []
00070             for i in resp.public_interface:
00071                 node_names.append(i.node)
00072             rospy.loginfo("TEST: Public Interface Nodes: %s"%str(node_names))
00073             if len(resp.public_interface) == num_nodes:
00074                 break
00075             rospy.loginfo("TEST:   Waiting for watcher thread to load all nodes.")
00076 
00077         for i in range(num_nodes):
00078             self.assertIn(resp.public_interface[i], expected_rules)
00079             expected_rules.remove(resp.public_interface[i])
00080 
00081         self.assertEqual(len(expected_rules), 0);
00082         rospy.loginfo("TEST: Public interface found as expected")
00083 
00084     def setUp(self):
00085         '''
00086           Run at the start of every test. This function performs the following:
00087             1. Ensure that gateway services are available
00088             2. Make sure gateway is connected to the hub
00089           If something goes wrong and the setup can not complete successfully,
00090           the test will timeout with an error
00091         '''
00092         rospy.wait_for_service('/gateway/advertise')
00093         rospy.wait_for_service('/gateway/advertise_all')
00094         rospy.wait_for_service('/gateway/gateway_info')
00095 
00096         self.advertise = rospy.ServiceProxy('/gateway/advertise',Advertise)
00097         self.advertiseAll = rospy.ServiceProxy('/gateway/advertise_all',AdvertiseAll)
00098         self.gatewayInfo = rospy.ServiceProxy('/gateway/gateway_info',GatewayInfo)
00099 
00100         # Make sure we are connected to the gateway first!
00101         while True:
00102             rospy.sleep(1.0)
00103             resp = self.gatewayInfo()
00104             if resp.connected:
00105                 break
00106             rospy.loginfo("TEST: Waiting for local gateway to be connected...")
00107 
00108         rospy.loginfo("TEST: Local gateway connected")
00109 
00110         # unit test property - show the difference when an assertion fails
00111         self.maxDiff = None
00112 
00113     def test_advertisePublisherByTopic(self):
00114         '''
00115           Tests advertising publishers. Also tests that multiple nodes are
00116           coming up as expected.
00117         '''
00118         # Test topic name directly
00119         watchlist = [Rule(ConnectionType.PUBLISHER, "/chatter", '')]
00120         expected_interface = list()
00121         expected_interface.append(Rule(ConnectionType.PUBLISHER, "/chatter", "/talker"))
00122         expected_interface.append(Rule(ConnectionType.PUBLISHER, "/chatter", "/talker2"))
00123         self.assertAdvertiseCall(watchlist, expected_interface)
00124 
00125         # Test topic name using regex
00126         watchlist = [Rule(ConnectionType.PUBLISHER, "/chat.*", '')]
00127         self.assertAdvertiseCall(watchlist, expected_interface)
00128 
00129         # Test bogus regex
00130         watchlist = [Rule(ConnectionType.PUBLISHER, "/chattt.*", '')]
00131         self.assertAdvertiseCall(watchlist, [])
00132 
00133     def test_advertisePublisherByNode(self):
00134         '''
00135           Tests advertising publishers. Also tests that multiple nodes are
00136           coming up as expected.
00137         '''
00138         watchlist = [Rule(ConnectionType.PUBLISHER, "/chatter", "/talker")]
00139         expected_interface = list()
00140         expected_interface.append(Rule(ConnectionType.PUBLISHER, "/chatter", "/talker"))
00141         self.assertAdvertiseCall(watchlist, expected_interface)
00142 
00143         # Test using regex
00144         watchlist = [Rule(ConnectionType.PUBLISHER, "/chat.*", ".*ker")]
00145         self.assertAdvertiseCall(watchlist, expected_interface)
00146 
00147     def test_advertiseDifferentConnections(self):
00148         '''
00149           Makes sure that every connection type is being detected and advertised
00150           appropriately.
00151         '''
00152         topics = {}
00153         topics[ConnectionType.PUBLISHER] = "/chatter"
00154         topics[ConnectionType.SUBSCRIBER] = "/chatter"
00155         topics[ConnectionType.SERVICE] = "/add_two_ints"
00156         topics[ConnectionType.ACTION_SERVER] = "/averaging_server/"
00157         topics[ConnectionType.ACTION_CLIENT] = "/fibonacci/"
00158         nodes = {}
00159         nodes[ConnectionType.PUBLISHER] = ["/talker","/talker2"]
00160         nodes[ConnectionType.SUBSCRIBER] = ["/listener"]
00161         nodes[ConnectionType.SERVICE] = ["/add_two_ints_server"]
00162         nodes[ConnectionType.ACTION_SERVER] = ["/averaging_server"]
00163         nodes[ConnectionType.ACTION_CLIENT] = ["/fibonacci_client"]
00164 
00165         watchlist = [Rule(type, topics[type], '') for type in topics]
00166         expected_interface = [Rule(type, topics[type], node) for type in topics for node in nodes[type]]
00167         self.assertAdvertiseCall(watchlist, expected_interface)
00168 
00169     def test_advertiseAll(self):
00170 
00171         topics = {}
00172         topics[ConnectionType.PUBLISHER] = "/chatter"
00173         topics[ConnectionType.SUBSCRIBER] = "/chatter"
00174         topics[ConnectionType.SERVICE] = "/add_two_ints"
00175         topics[ConnectionType.ACTION_SERVER] = "/averaging_server/"
00176         topics[ConnectionType.ACTION_CLIENT] = "/fibonacci/"
00177         nodes = {}
00178         nodes[ConnectionType.PUBLISHER] = ["/talker","/talker2"]
00179         nodes[ConnectionType.SUBSCRIBER] = ["/listener"]
00180         nodes[ConnectionType.SERVICE] = ["/add_two_ints_server"]
00181         nodes[ConnectionType.ACTION_SERVER] = ["/averaging_server"]
00182         nodes[ConnectionType.ACTION_CLIENT] = ["/fibonacci_client"]
00183 
00184         # no blacklist
00185         blacklist = []
00186         expected_interface = [Rule(type, topics[type], node) for type in topics for node in nodes[type]]
00187         expected_interface.append(Rule(ConnectionType.SUBSCRIBER, "/random_number", "/averaging_server"))
00188         self.assertAdvertiseAllCall(blacklist, expected_interface)
00189 
00190         # 2 items in blacklist
00191         blacklist = []
00192         blacklist.append(Rule(ConnectionType.PUBLISHER, "/chatter", ""))
00193         blacklist.append(Rule(ConnectionType.SUBSCRIBER, "/random_number", "/averaging_server"))
00194         expected_interface = [Rule(type, topics[type], node) for type in topics for node in nodes[type]]
00195         expected_interface[:] = [e for e in expected_interface if e.type != ConnectionType.PUBLISHER]
00196         self.assertAdvertiseAllCall(blacklist, expected_interface)
00197 
00198         # test regex in blacklist
00199         blacklist = []
00200         blacklist.append(Rule(ConnectionType.SUBSCRIBER, "/random.*", ""))
00201         expected_interface = [Rule(type, topics[type], node) for type in topics for node in nodes[type]]
00202         self.assertAdvertiseAllCall(blacklist, expected_interface)
00203 
00204         blacklist = []
00205         blacklist.append(Rule(ConnectionType.SUBSCRIBER, "/random2.*", "")) 
00206         expected_interface = [Rule(type, topics[type], node) for type in topics for node in nodes[type]]
00207         expected_interface.append(Rule(ConnectionType.SUBSCRIBER, "/random_number", "/averaging_server"))
00208         self.assertAdvertiseAllCall(blacklist, expected_interface)
00209 
00210         blacklist = []
00211         blacklist.append(Rule(ConnectionType.SUBSCRIBER, "/random.*", ".*raging.*"))
00212         expected_interface = [Rule(type, topics[type], node) for type in topics for node in nodes[type]]
00213         self.assertAdvertiseAllCall(blacklist, expected_interface)
00214 
00215     def tearDown(self):
00216         '''
00217           Called at the end of every test to ensure that all the advertisements
00218           are removed
00219         '''
00220         req = AdvertiseAllRequest()
00221         req.cancel = True
00222         self.advertiseAll(req)
00223         self.assertPublicInterface([])
00224 
00225 if __name__ == '__main__':
00226     rospy.init_node('multimaster_test')
00227     rostest.rosrun(PKG, 'test_advertisements_locally', TestAdvertisementsLocally) 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Properties Friends


rocon_gateway_tests
Author(s): Jihoon
autogenerated on Tue Jan 15 2013 17:44:10