00001
00002
00003
00004
00005
00006
00007 PKG = 'rocon_gateway_tests'
00008 import roslib; roslib.load_manifest(PKG)
00009 import rospy
00010 import rostest
00011 from gateway_msgs.msg import *
00012 from gateway_msgs.srv import *
00013 import unittest
00014 import std_msgs
00015 import copy
00016
00017 class TestAdvertisementsLocally(unittest.TestCase):
00018
00019 def assertAdvertiseAllCall(self, blacklist, expected_interface):
00020 req = AdvertiseAllRequest()
00021 req.cancel = False
00022 req.blacklist = copy.deepcopy(blacklist)
00023
00024
00025 resp = self.advertiseAll(req)
00026 self.assertEquals(resp.result, Result.SUCCESS)
00027
00028
00029 self.assertPublicInterface(expected_interface)
00030
00031
00032 req.cancel = True
00033 resp = self.advertiseAll(req)
00034 self.assertEquals(resp.result, Result.SUCCESS)
00035
00036 self.assertPublicInterface([])
00037
00038 def assertAdvertiseCall(self, watchlist, expected_interface):
00039
00040
00041 req = AdvertiseRequest()
00042 req.rules = copy.deepcopy(watchlist)
00043 req.cancel = False
00044
00045
00046 resp = self.advertise(req)
00047 self.assertEquals(resp.result, Result.SUCCESS)
00048 self.assertEquals(len(resp.watchlist), len(watchlist))
00049 for rule in resp.watchlist:
00050 self.assertIn(rule, watchlist)
00051
00052
00053 self.assertPublicInterface(expected_interface)
00054
00055
00056 req.cancel = True
00057 resp = self.advertise(req)
00058 self.assertEquals(resp.result, Result.SUCCESS)
00059 self.assertEquals(len(resp.watchlist), 0)
00060 self.assertPublicInterface([])
00061
00062 def assertPublicInterface(self, rules):
00063
00064 expected_rules = copy.deepcopy(rules)
00065 num_nodes = len(expected_rules)
00066 while True:
00067 rospy.sleep(1.0)
00068 resp = self.gatewayInfo()
00069 node_names = []
00070 for i in resp.public_interface:
00071 node_names.append(i.node)
00072 rospy.loginfo("TEST: Public Interface Nodes: %s"%str(node_names))
00073 if len(resp.public_interface) == num_nodes:
00074 break
00075 rospy.loginfo("TEST: Waiting for watcher thread to load all nodes.")
00076
00077 for i in range(num_nodes):
00078 self.assertIn(resp.public_interface[i], expected_rules)
00079 expected_rules.remove(resp.public_interface[i])
00080
00081 self.assertEqual(len(expected_rules), 0);
00082 rospy.loginfo("TEST: Public interface found as expected")
00083
00084 def setUp(self):
00085 '''
00086 Run at the start of every test. This function performs the following:
00087 1. Ensure that gateway services are available
00088 2. Make sure gateway is connected to the hub
00089 If something goes wrong and the setup can not complete successfully,
00090 the test will timeout with an error
00091 '''
00092 rospy.wait_for_service('/gateway/advertise')
00093 rospy.wait_for_service('/gateway/advertise_all')
00094 rospy.wait_for_service('/gateway/gateway_info')
00095
00096 self.advertise = rospy.ServiceProxy('/gateway/advertise',Advertise)
00097 self.advertiseAll = rospy.ServiceProxy('/gateway/advertise_all',AdvertiseAll)
00098 self.gatewayInfo = rospy.ServiceProxy('/gateway/gateway_info',GatewayInfo)
00099
00100
00101 while True:
00102 rospy.sleep(1.0)
00103 resp = self.gatewayInfo()
00104 if resp.connected:
00105 break
00106 rospy.loginfo("TEST: Waiting for local gateway to be connected...")
00107
00108 rospy.loginfo("TEST: Local gateway connected")
00109
00110
00111 self.maxDiff = None
00112
00113 def test_advertisePublisherByTopic(self):
00114 '''
00115 Tests advertising publishers. Also tests that multiple nodes are
00116 coming up as expected.
00117 '''
00118
00119 watchlist = [Rule(ConnectionType.PUBLISHER, "/chatter", '')]
00120 expected_interface = list()
00121 expected_interface.append(Rule(ConnectionType.PUBLISHER, "/chatter", "/talker"))
00122 expected_interface.append(Rule(ConnectionType.PUBLISHER, "/chatter", "/talker2"))
00123 self.assertAdvertiseCall(watchlist, expected_interface)
00124
00125
00126 watchlist = [Rule(ConnectionType.PUBLISHER, "/chat.*", '')]
00127 self.assertAdvertiseCall(watchlist, expected_interface)
00128
00129
00130 watchlist = [Rule(ConnectionType.PUBLISHER, "/chattt.*", '')]
00131 self.assertAdvertiseCall(watchlist, [])
00132
00133 def test_advertisePublisherByNode(self):
00134 '''
00135 Tests advertising publishers. Also tests that multiple nodes are
00136 coming up as expected.
00137 '''
00138 watchlist = [Rule(ConnectionType.PUBLISHER, "/chatter", "/talker")]
00139 expected_interface = list()
00140 expected_interface.append(Rule(ConnectionType.PUBLISHER, "/chatter", "/talker"))
00141 self.assertAdvertiseCall(watchlist, expected_interface)
00142
00143
00144 watchlist = [Rule(ConnectionType.PUBLISHER, "/chat.*", ".*ker")]
00145 self.assertAdvertiseCall(watchlist, expected_interface)
00146
00147 def test_advertiseDifferentConnections(self):
00148 '''
00149 Makes sure that every connection type is being detected and advertised
00150 appropriately.
00151 '''
00152 topics = {}
00153 topics[ConnectionType.PUBLISHER] = "/chatter"
00154 topics[ConnectionType.SUBSCRIBER] = "/chatter"
00155 topics[ConnectionType.SERVICE] = "/add_two_ints"
00156 topics[ConnectionType.ACTION_SERVER] = "/averaging_server/"
00157 topics[ConnectionType.ACTION_CLIENT] = "/fibonacci/"
00158 nodes = {}
00159 nodes[ConnectionType.PUBLISHER] = ["/talker","/talker2"]
00160 nodes[ConnectionType.SUBSCRIBER] = ["/listener"]
00161 nodes[ConnectionType.SERVICE] = ["/add_two_ints_server"]
00162 nodes[ConnectionType.ACTION_SERVER] = ["/averaging_server"]
00163 nodes[ConnectionType.ACTION_CLIENT] = ["/fibonacci_client"]
00164
00165 watchlist = [Rule(type, topics[type], '') for type in topics]
00166 expected_interface = [Rule(type, topics[type], node) for type in topics for node in nodes[type]]
00167 self.assertAdvertiseCall(watchlist, expected_interface)
00168
00169 def test_advertiseAll(self):
00170
00171 topics = {}
00172 topics[ConnectionType.PUBLISHER] = "/chatter"
00173 topics[ConnectionType.SUBSCRIBER] = "/chatter"
00174 topics[ConnectionType.SERVICE] = "/add_two_ints"
00175 topics[ConnectionType.ACTION_SERVER] = "/averaging_server/"
00176 topics[ConnectionType.ACTION_CLIENT] = "/fibonacci/"
00177 nodes = {}
00178 nodes[ConnectionType.PUBLISHER] = ["/talker","/talker2"]
00179 nodes[ConnectionType.SUBSCRIBER] = ["/listener"]
00180 nodes[ConnectionType.SERVICE] = ["/add_two_ints_server"]
00181 nodes[ConnectionType.ACTION_SERVER] = ["/averaging_server"]
00182 nodes[ConnectionType.ACTION_CLIENT] = ["/fibonacci_client"]
00183
00184
00185 blacklist = []
00186 expected_interface = [Rule(type, topics[type], node) for type in topics for node in nodes[type]]
00187 expected_interface.append(Rule(ConnectionType.SUBSCRIBER, "/random_number", "/averaging_server"))
00188 self.assertAdvertiseAllCall(blacklist, expected_interface)
00189
00190
00191 blacklist = []
00192 blacklist.append(Rule(ConnectionType.PUBLISHER, "/chatter", ""))
00193 blacklist.append(Rule(ConnectionType.SUBSCRIBER, "/random_number", "/averaging_server"))
00194 expected_interface = [Rule(type, topics[type], node) for type in topics for node in nodes[type]]
00195 expected_interface[:] = [e for e in expected_interface if e.type != ConnectionType.PUBLISHER]
00196 self.assertAdvertiseAllCall(blacklist, expected_interface)
00197
00198
00199 blacklist = []
00200 blacklist.append(Rule(ConnectionType.SUBSCRIBER, "/random.*", ""))
00201 expected_interface = [Rule(type, topics[type], node) for type in topics for node in nodes[type]]
00202 self.assertAdvertiseAllCall(blacklist, expected_interface)
00203
00204 blacklist = []
00205 blacklist.append(Rule(ConnectionType.SUBSCRIBER, "/random2.*", ""))
00206 expected_interface = [Rule(type, topics[type], node) for type in topics for node in nodes[type]]
00207 expected_interface.append(Rule(ConnectionType.SUBSCRIBER, "/random_number", "/averaging_server"))
00208 self.assertAdvertiseAllCall(blacklist, expected_interface)
00209
00210 blacklist = []
00211 blacklist.append(Rule(ConnectionType.SUBSCRIBER, "/random.*", ".*raging.*"))
00212 expected_interface = [Rule(type, topics[type], node) for type in topics for node in nodes[type]]
00213 self.assertAdvertiseAllCall(blacklist, expected_interface)
00214
00215 def tearDown(self):
00216 '''
00217 Called at the end of every test to ensure that all the advertisements
00218 are removed
00219 '''
00220 req = AdvertiseAllRequest()
00221 req.cancel = True
00222 self.advertiseAll(req)
00223 self.assertPublicInterface([])
00224
00225 if __name__ == '__main__':
00226 rospy.init_node('multimaster_test')
00227 rostest.rosrun(PKG, 'test_advertisements_locally', TestAdvertisementsLocally)