00001
00002 import numpy as np
00003 import sys
00004 from threading import Lock
00005
00006 import roslib
00007 roslib.load_manifest('hrl_ellipsoidal_control')
00008
00009 import rospy
00010 from geometry_msgs.msg import PoseStamped
00011
00012 import hrl_geom.transformations as trans
00013 from hrl_geom.pose_converter import PoseConv
00014
00015 def min_jerk_traj(n):
00016 return [(10 * t**3 - 15 * t**4 + 6 * t**5)
00017 for t in np.linspace(0, 1, n)]
00018
00019 class CartTrajController(object):
00020 def __init__(self):
00021 self._moving_lock = Lock()
00022 self._timer = None
00023
00024 def stop_moving(self, wait=False):
00025 self._stop_moving = True
00026 if wait:
00027 self.wait_until_stopped()
00028
00029 def is_moving(self):
00030 return self._timer is not None
00031
00032 def wait_until_stopped(self):
00033 if self._timer is not None:
00034 self._timer.join()
00035
00036 def execute_cart_traj(self, cart_arm, traj, time_step, blocking=True):
00037 if self._moving_lock.acquire(False):
00038 self._stop_moving = False
00039 self._is_blocking = blocking
00040 def execute_cart_traj_cb(event):
00041 self._cur_result = self._execute_cart_traj(cart_arm, traj, time_step)
00042 self._timer = None
00043 if not self._is_blocking:
00044 self._moving_lock.release()
00045 self._timer = rospy.Timer(rospy.Duration(0.00000001), execute_cart_traj_cb, oneshot=True)
00046 if self._is_blocking:
00047 self.wait_until_stopped()
00048 retval = self._cur_result
00049 self._moving_lock.release()
00050 return retval
00051 else:
00052 return True
00053 else:
00054 return False
00055
00056 def _execute_cart_traj(self, cart_arm, traj, time_step):
00057 rate = rospy.Rate(1.0/time_step)
00058 for ep in traj:
00059 if rospy.is_shutdown() or self._stop_moving:
00060 return False
00061 cart_arm.set_ep(ep)
00062 rate.sleep()
00063 return True
00064
00065 class CartesianStepController(CartTrajController):
00066 def __init__(self):
00067 super(CartesianStepController, self).__init__()
00068
00069 self.time_step = 1. / 20.
00070 self.arm = None
00071 self.cmd_lock = Lock()
00072 self.start_pub = rospy.Publisher("/start_pose", PoseStamped)
00073 self.end_pub = rospy.Publisher("/end_pose", PoseStamped)
00074
00075 def set_arm(self, arm):
00076 self.arm = arm
00077
00078 def _run_traj(self, traj, blocking=True):
00079 self.start_pub.publish(
00080 PoseConv.to_pose_stamped_msg("/torso_lift_link", traj[0]))
00081 self.end_pub.publish(
00082 PoseConv.to_pose_stamped_msg("/torso_lift_link", traj[-1]))
00083
00084 init_pos_tolerance = rospy.get_param("~init_pos_tolerance", 0.05)
00085 init_rot_tolerance = rospy.get_param("~init_rot_tolerance", np.pi/12)
00086 ee_pos, ee_rot = PoseConv.to_pos_rot(self.arm.get_end_effector_pose())
00087 _, rot_diff = PoseConv.to_pos_euler((ee_pos, ee_rot * traj[0][1].T))
00088 pos_diff = np.linalg.norm(ee_pos - traj[0][0])
00089 if pos_diff > init_pos_tolerance:
00090 rospy.logwarn("[controller_base] End effector too far from current position. " +
00091 "Pos diff: %.3f, Tolerance: %.3f" % (pos_diff, init_pos_tolerance))
00092 return False
00093 if np.linalg.norm(rot_diff) > init_rot_tolerance:
00094 rospy.logwarn("[controller_base] End effector too far from current rotation. " +
00095 "Rot diff: %.3f, Tolerance: %.3f" % (np.linalg.norm(rot_diff),
00096 init_rot_tolerance))
00097 return False
00098 return self.execute_cart_traj(self.arm, traj, self.time_step, blocking=blocking)
00099
00100 def execute_cart_move(self, change_ep, abs_sel, velocity=0.001,
00101 num_samps=None, blocking=True):
00102 if not self.cmd_lock.acquire(False):
00103 return False
00104 cur_pos, cur_rot = PoseConv.to_pos_rot(self.arm.get_ep())
00105 change_pos_ep, change_rot_ep = change_ep
00106 abs_cart_ep_sel, is_abs_rot = abs_sel
00107 pos_f = np.where(abs_cart_ep_sel, change_pos_ep,
00108 np.array(cur_pos + cur_rot * np.mat(change_pos_ep).T).T[0])
00109 if is_abs_rot:
00110 rot_mat_f = change_rot_ep
00111 else:
00112 rpy = change_rot_ep
00113 _, cur_rot = PoseConv.to_pos_rot(self.arm.get_ep())
00114 rot_mat = np.mat(trans.euler_matrix(*rpy))[:3,:3]
00115 rot_mat_f = cur_rot * rot_mat
00116 traj = self._create_cart_trajectory(pos_f, rot_mat_f, velocity, num_samps)
00117 retval = self._run_traj(traj, blocking=blocking)
00118 self.cmd_lock.release()
00119 return retval
00120
00121 def _create_cart_trajectory(self, pos_f, rot_mat_f, velocity=0.001, num_samps=None):
00122 cur_pos, cur_rot = PoseConv.to_pos_rot(self.arm.get_ep())
00123
00124 rpy = trans.euler_from_matrix(cur_rot.T * rot_mat_f)
00125
00126 if num_samps is None:
00127 num_samps = np.max([2, int(np.linalg.norm(pos_f - cur_pos) / velocity),
00128 int(np.linalg.norm(rpy) / velocity)])
00129
00130 traj = self.arm.interpolate_ep([cur_pos, cur_rot],
00131 [np.mat(pos_f).T, rot_mat_f],
00132 min_jerk_traj(num_samps))
00133 return traj