Go to the documentation of this file.00001
00002
00003 import sys, yaml
00004 import roslib; roslib.load_manifest('fs100')
00005 import rospy, rosbag
00006 from sensor_msgs.msg import JointState
00007 from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
00008
00009
00010 def print_usage():
00011 print '\nUsage:\n\n move_to_joint.py endPos [duration]'
00012 print ' where endPos is: "[J1, J2, J3,...]" in radians'
00013 print ' and duration (OPTIONAL) is the expected move duration (in seconds)'
00014 print '\n move_to_joint.py trajectory.bag [duration]'
00015 print ' will move to the first JointTrajectory point in a bag file'
00016 print ' where duration (OPTIONAL) is the expected move duration (in seconds)\n'
00017
00018
00019
00020
00021 def build_traj(start, end, duration):
00022
00023 if sorted(start.name) <> sorted(end.name):
00024 rospy.logerr('Start and End position joint_names mismatch')
00025 raise
00026
00027
00028 joint_names = start.name
00029
00030 start_pt = JointTrajectoryPoint()
00031 start_pt.positions = start.position
00032 start_pt.velocities = [0]*len(start.position)
00033 start_pt.time_from_start = rospy.Duration(0.0)
00034
00035 end_pt = JointTrajectoryPoint()
00036 for j in joint_names:
00037 idx = end.name.index(j)
00038 end_pt.positions.append(end.position[idx])
00039 end_pt.velocities.append(0)
00040 end_pt.time_from_start = rospy.Duration(duration)
00041
00042 return JointTrajectory(joint_names=joint_names, points=[start_pt, end_pt])
00043
00044
00045 def get_cur_pos():
00046 try:
00047 return rospy.wait_for_message("joint_states", JointState, 5.0)
00048 except (rospy.ROSException, rospy.ROSInterruptException):
00049 rospy.logerr('Unable to read current position')
00050 raise
00051
00052
00053 def wait_for_subs(pub, num_subs, min_time, timeout):
00054 end = rospy.Time.now() + rospy.Duration(timeout)
00055 rospy.sleep(min_time)
00056
00057 r = rospy.Rate(10)
00058 while (pub.get_num_connections() < num_subs) and (rospy.Time.now() < end) and not rospy.is_shutdown():
00059 r.sleep()
00060
00061 return (pub.get_num_connections() >= num_subs)
00062
00063
00064
00065
00066
00067 def move_to_joint(end_pos, duration):
00068
00069 traj = build_traj(get_cur_pos(), end_pos, duration)
00070
00071
00072 pub = rospy.Publisher('joint_path_command', JointTrajectory)
00073 if not wait_for_subs(pub, 1, 0.5, 2.0):
00074 rospy.logwarn('Timeout while waiting for subscribers. Publishing trajectory anyway.')
00075
00076 pub.publish(traj)
00077
00078
00079
00080 def get_pos_from_bag(bag_file, topic_name):
00081 with rosbag.Bag(bag_file) as bag:
00082 msgs = list(bag.read_messages(topic_name))
00083
00084 if len(msgs) <> 1:
00085 rospy.logwarn("Multiple trajectories found. Exporting first trajectory only")
00086
00087 traj = msgs[0][1]
00088
00089 return JointState(name=traj.joint_names, position=traj.points[0].positions)
00090
00091
00092
00093 def get_joint_names(num_joints):
00094
00095 if num_joints == 6:
00096 default_names = ['joint_'+j for j in ['s','l','u','r','b','t']]
00097 elif num_joints == 7:
00098 default_names = ['joint_'+j for j in ['s','l','e','u','r','b','t']]
00099 else:
00100 default_names = ''
00101
00102 return rospy.get_param('~joint_names', default_names)
00103
00104
00105 def parse_args(args):
00106 if len(args) < 1 or len(args) > 2:
00107 print_usage()
00108 raise ValueError("Illegal number of arguments")
00109
00110
00111 try:
00112 end_pos = get_pos_from_bag(args[0], 'joint_path_command')
00113 except:
00114 pos = yaml.load(args[0])
00115 names = get_joint_names(len(pos))
00116 end_pos = JointState(name=names, position=pos)
00117
00118 duration = float(args[1]) if len(args)>1 else 10.0
00119
00120 return (end_pos, duration)
00121
00122 def main(argv):
00123 rospy.init_node('move_to_joint')
00124
00125 try:
00126 (end_pos, duration) = parse_args(argv)
00127 move_to_joint(end_pos, duration)
00128 except:
00129 rospy.logerr('Unable to move to commanded position. Aborting.')
00130 raise
00131
00132 if __name__ == "__main__":
00133 main(sys.argv[1:])
00134