Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007 import roslib
00008 roslib.load_manifest('rocon_gateway_tests')
00009 import rospy
00010 from rocon_gateway import FlippedInterface, createEmptyConnectionTypeDictionary, Connection
00011 from gateway_msgs.msg import Rule, ConnectionType, RemoteRule
00012 import argparse
00013
00014 """
00015 Tests various functions in the flipped interface class.
00016
00017 Usage:
00018 > rosrun rocon_gateway_tests flipped_interface
00019 """
00020
00021 def print_flips(new_flips, old_flips):
00022 print ""
00023 print "NEW FLIPS:"
00024 print ""
00025 print new_flips
00026 print ""
00027 print "OLD FLIPS:"
00028 print ""
00029 print old_flips
00030 print ""
00031
00032 if __name__ == '__main__':
00033
00034 parser = argparse.ArgumentParser(description='Tests the update method in the flipped interface.')
00035
00036
00037
00038
00039
00040 rospy.init_node('flipped_interface')
00041 flipped_interface = FlippedInterface(default_rule_blacklist=[])
00042
00043
00044
00045
00046 connection_rule = Rule()
00047 connection_rule.type=ConnectionType.PUBLISHER
00048 connection_rule.name="/foo"
00049 connection_rule.node="/bar"
00050 system_connections = createEmptyConnectionTypeDictionary()
00051 connection = Connection(connection_rule,"std_msgs/String","http://localhost:5321")
00052 system_connections[ConnectionType.PUBLISHER].append(connection)
00053
00054 flip_rule = RemoteRule()
00055 flip_rule.gateway='gateway1'
00056 flip_rule.rule.name='/chatter'
00057 flip_rule.rule.type=ConnectionType.PUBLISHER
00058
00059 rospy.loginfo("************* Update after a new flip rule *************")
00060 rospy.loginfo("Expected Result: the system connection is not yet present")
00061 rospy.loginfo("so should return empty results (no matches) from the update.")
00062 flipped_interface.add_rule(flip_rule)
00063 new_flips, old_flips = flipped_interface.update(system_connections)
00064
00065 print_flips(new_flips,old_flips)
00066
00067
00068 rospy.loginfo("************* Update after system connection acquired *************")
00069 rospy.loginfo("Expected Result: should see a match and a new item in new_flips.")
00070
00071 connection_rule = Rule()
00072 connection_rule.type=ConnectionType.PUBLISHER
00073 connection_rule.name="/chatter"
00074 connection_rule.node="/talker"
00075 system_connections = createEmptyConnectionTypeDictionary()
00076 connection = Connection(connection_rule,"std_msgs/String","http://localhost:5321")
00077 system_connections[ConnectionType.PUBLISHER].append(connection)
00078
00079 new_flips, old_flips = flipped_interface.update(system_connections)
00080 print_flips(new_flips,old_flips)
00081
00082 rospy.loginfo("************* Update after flip rule removed acquired *************")
00083 rospy.loginfo("Expected Result: should see a match existing in old flips.")
00084
00085 if not flipped_interface.remove_rule(flip_rule):
00086 rospy.logerr("Failed to remove an existing rule from the flip rules.")
00087
00088 new_flips, old_flips = flipped_interface.update(system_connections)
00089 print_flips(new_flips,old_flips)
00090 rospy.loginfo("************* Done *************")
00091