Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 import roslib; roslib.load_manifest('rocon_gateway_demos')
00011 import rospy
00012 import rocon_gateway
00013 import rocon_gateway_demos
00014 from gateway_msgs.msg import *
00015 from gateway_msgs.srv import *
00016 import argparse
00017 import sys
00018
00019
00020
00021
00022
00023 class Context(object):
00024 def __init__(self, gateway, cancel_flag, regex):
00025 self.gateway = gateway
00026 if cancel_flag:
00027 self.action_text = "cancelling"
00028 else:
00029 self.action_text = "flipping"
00030 self.flip_service = rospy.ServiceProxy('/gateway/flip',Remote)
00031 self.req = RemoteRequest()
00032 self.req.cancel = cancel_flag
00033 self.req.remotes = []
00034 self.names, self.nodes = rocon_gateway_demos.createTutorialDictionaries(regex)
00035
00036 def flip(self, type):
00037 rule = gateway_msgs.msg.Rule()
00038 rule.name = self.names[type]
00039 rule.type = type
00040 rule.node = self.nodes[type]
00041 self.req.remotes.append(RemoteRule(self.gateway,rule))
00042 rospy.loginfo("Flip : %s [%s,%s,%s,%s]."%(self.action_text,
00043 self.gateway,
00044 rule.type,
00045 rule.name,
00046 rule.node or 'None'))
00047 resp = self.flip_service(self.req)
00048 if resp.result != 0:
00049 rospy.logerr("Flip : %s"%resp.error_message)
00050 self.req.remotes = []
00051
00052
00053
00054
00055
00056 """
00057 Tests flips, either for all tutorials (default) or one by one (via args).
00058
00059 Usage:
00060 1 > roslaunch rocon_gateway_demos pirate_hub.launch
00061 2a> roslaunch rocon_gateway_demos pirate_gateway_tutorials.launch
00062 3a> roslaunch rocon_gateway_demos pirate_gateway.launch
00063 2b> rosrun rocon_gateway_demos flip_tutorials.py
00064 3b> rostopic list
00065 2c> rosrun rocon_gateway_demos flip_tutorials.py --cancel
00066 2d> rosrun rocon_gateway_demos flip_tutorials.py --regex
00067 2e> rosrun rocon_gateway_demos flip_tutorials.py --regex --cancel
00068 """
00069
00070 if __name__ == '__main__':
00071
00072 parser = argparse.ArgumentParser(description='Flip roscpp tutorial connections (/chatter, /add_two_ints to a remote gateway')
00073 parser.add_argument('--pubonly', action='store_true', help='flip /chatter publisher only')
00074 parser.add_argument('--subonly', action='store_true', help='flip /chatter subscriber only')
00075 parser.add_argument('--serviceonly', action='store_true', help='flip add_two_ints service only')
00076 parser.add_argument('--actionclientonly', action='store_true', help='flip /fibonacci action client only')
00077 parser.add_argument('--actionserveronly', action='store_true', help='flip /averaging action server only')
00078 parser.add_argument('--regex', action='store_true', help='test with a regex pattern')
00079 parser.add_argument('--cancel', action='store_true', help='cancel the flip')
00080 args = parser.parse_args()
00081 flip_all_connection_types = (not args.pubonly) and (not args.subonly) and (not args.serviceonly) and (not args.actionclientonly) and (not args.actionserveronly)
00082 if args.cancel:
00083 action_text = "cancelling"
00084 else:
00085 action_text = "flipping"
00086
00087 rospy.init_node('flip_tutorials')
00088
00089 try:
00090 gateway = "pirate_gateway.*"
00091 except rocon_gateway.GatewayError as e:
00092 rospy.logerr("Flip Test : %s, aborting."%(str(e)))
00093 sys.exit(1)
00094
00095 context = Context(gateway, args.cancel, args.regex)
00096
00097 if args.pubonly or flip_all_connection_types:
00098 context.flip(ConnectionType.PUBLISHER)
00099
00100 if args.subonly or flip_all_connection_types:
00101 context.flip(ConnectionType.SUBSCRIBER)
00102
00103 if args.serviceonly or flip_all_connection_types:
00104 context.flip(ConnectionType.SERVICE)
00105
00106 if args.actionclientonly or flip_all_connection_types:
00107 context.flip(ConnectionType.ACTION_CLIENT)
00108
00109 if args.actionserveronly or flip_all_connection_types:
00110 context.flip(ConnectionType.ACTION_SERVER)