Go to the documentation of this file.00001 import copy
00002
00003 import roslib
00004 roslib.load_manifest('actionlib')
00005 roslib.load_manifest('pr2_controllers_msgs')
00006
00007 from Action import Action
00008 from actionlib import SimpleActionClient
00009 import math
00010 from pr2_controllers_msgs.msg import JointTrajectoryAction, JointTrajectoryGoal
00011 from trajectory_msgs.msg import JointTrajectoryPoint
00012 import rospy
00013
00014 class Pr2JointTrajectoryAction(Action):
00015
00016 def __init__(self, topic, transform_to_radian = True):
00017 super(Pr2JointTrajectoryAction, self).__init__()
00018 self._client = SimpleActionClient(topic, JointTrajectoryAction)
00019 self._joints = []
00020 self._values = []
00021 self._transform_to_radian = transform_to_radian
00022 self._timer = None
00023
00024 def values(self):
00025 return self._values
00026
00027 def set_values(self, values):
00028 assert(len(self._values) == len(values))
00029 for i, desc in enumerate(self._joints):
00030
00031 self._values[i] = max(desc['min'], min(values[i], desc['max']))
00032
00033 def get_labels(self):
00034 return [data['label'] for data in self._joints]
00035
00036 def get_value(self, label):
00037 indexes = [i for i, data in enumerate(self._joints) if data['label'] == label]
00038 if len(indexes) == 1:
00039 return self._values[indexes[0]]
00040 raise KeyError('joint with label "%s" not found' % label)
00041
00042 def update_value(self, label, value):
00043 indexes = [i for i, data in enumerate(self._joints) if data['label'] == label]
00044 if len(indexes) == 1:
00045 self._values[indexes[0]] = value
00046 return
00047 raise KeyError('joint with label "%s" not found' % label)
00048
00049 def to_string(self):
00050 data = []
00051 for value in self._values:
00052 data.append('%.0f' % value)
00053 return ','.join(data)
00054
00055 def deepcopy(self):
00056 action = super(Pr2JointTrajectoryAction, self).deepcopy()
00057 action._values = copy.deepcopy(self._values)
00058 return action
00059
00060 def serialize(self, stream):
00061 super(Pr2JointTrajectoryAction, self).serialize(stream)
00062 stream.serialize_data(self._values)
00063
00064 def deserialize(self, stream):
00065 super(Pr2JointTrajectoryAction, self).deserialize(stream)
00066 self._values = stream.deserialize_data()
00067
00068 def _add_joint(self, label, min, max, single_step):
00069 self._joints.append({'label': label, 'min': min, 'max': max, 'single_step': single_step})
00070
00071 self._values.append((max + min) / 2.0)
00072
00073 def execute(self):
00074 super(Pr2JointTrajectoryAction, self).execute()
00075 goal = JointTrajectoryGoal()
00076 point = JointTrajectoryPoint()
00077 goal.trajectory.joint_names = [''] * len(self._joints)
00078 point.positions = [0.0] * len(self._joints)
00079 point.velocities = [0.0] * len(self._joints)
00080 for i, desc in enumerate(self._joints):
00081 goal.trajectory.joint_names[i] = desc['label']
00082 point.positions[i] = self._values[i]
00083 if self._transform_to_radian:
00084 point.positions[i] *= math.pi / 180.0
00085 point.time_from_start = rospy.Duration.from_sec(self.get_duration())
00086 goal.trajectory.points = [ point ]
00087
00088 self._client.send_goal(goal)
00089 self._timer = rospy.Timer(rospy.Duration.from_sec(self.get_duration()), self._timer_finished, oneshot=True)
00090
00091 def _timer_finished(self, event):
00092
00093 self._timer = None
00094 self._execute_finished()