20 from actionlib
import SimpleActionClient
21 from control_msgs.msg
import FollowJointTrajectoryAction, \
22 FollowJointTrajectoryGoal
23 from moveit_commander
import MoveGroupCommander, RobotCommander, \
24 PlanningSceneInterface
25 from moveit_msgs.msg
import RobotTrajectory, PositionIKRequest
26 from sensor_msgs.msg
import JointState
27 import geometry_msgs.msg
28 from sr_robot_msgs.srv
import RobotTeachMode, RobotTeachModeRequest, \
29 RobotTeachModeResponse
31 from moveit_msgs.srv
import GetPositionIK
32 from moveit_msgs.srv
import ListRobotStatesInWarehouse
as ListStates
33 from moveit_msgs.srv
import GetRobotStateFromWarehouse
as GetState
34 from moveit_msgs.msg
import OrientationConstraint, Constraints
36 from trajectory_msgs.msg
import JointTrajectoryPoint, JointTrajectory
37 from math
import radians
39 from moveit_msgs.srv
import GetPositionFK
40 from std_msgs.msg
import Header
59 Base class for hand and arm commanders. 64 Initialize MoveGroupCommander object. 65 @param name - name of the MoveIt group. 82 rospy.Subscriber(
"joint_states", JointState,
93 rospy.wait_for_service(
'compute_ik')
94 self.
_compute_ik = rospy.ServiceProxy(
'compute_ik', GetPositionIK)
95 self.
_forward_k = rospy.ServiceProxy(
'compute_fk', GetPositionFK)
97 controller_list_param = rospy.get_param(
"/move_group/controller_list")
100 self.
_controllers = {item[
"name"]: item[
"joints"]
for item
in controller_list_param}
107 threading.Thread(
None, rospy.spin)
111 Sets the planner_id used for all future planning requests. 112 @param planner_id - The string for the planner id, set to None to clear. 121 Specifies the amount of time to be used for motion planning. 122 Some planning algorithms might require more time than others to compute 135 response = self.
_forward_k(header, fk_link_names, state)
136 return response.pose_stamped[0]
140 @return - returns the name of the frame wrt which the motion planning 147 Set the reference frame to assume for poses of end-effectors. 148 @param reference_frame - name of the frame. 161 Set a scaling factor for optionally reducing the maximum joint velocity. 162 @param value - Allowed values are in (0,1]. 168 Set a scaling factor for optionally reducing the maximum joint accelaration. 169 @param value - Allowed values are in (0,1]. 175 Enable/disable looking around for motion planning. 176 @param value - boolean. 182 Enable/disable replanning in case new obstacles are detected 183 while executing a plan. 184 @param value - boolean. 190 Executes the last plan made. 196 rospy.logwarn(
"No plans were made, not executing anything.")
200 Executes a given plan. 201 @param plan - RobotTrajectory msg that contains the trajectory 202 to the set goal state. 208 rospy.logwarn(
"Plan is not valid, not executing anything.")
211 angle_degrees=False):
213 Set target of the robot's links and moves to it. 214 @param joint_states - dictionary with joint name and value. It can 215 contain only joints values of which need to be changed. 216 @param wait - should method wait for movement end or not. 217 @param angle_degrees - are joint_states in degrees or not. 219 joint_states_cpy = copy.deepcopy(joint_states)
222 joint_states_cpy.update((joint, radians(i))
223 for joint, i
in joint_states_cpy.items())
230 Set target of the robot's links and plans. 231 @param joint_states - dictionary with joint name and value. It can 232 contain only joints values of which need to be changed. 233 @param angle_degrees - are joint_states in degrees or not. 234 @param custom_start_state - specify a start state different than the current state 235 This is a blocking method. 236 @return - motion plan (RobotTrajectory msg) that contains the trajectory to the set goal state. 238 joint_states_cpy = copy.deepcopy(joint_states)
241 joint_states_cpy.update((joint, radians(i))
242 for joint, i
in joint_states_cpy.items())
243 if custom_start_state
is None:
253 Checks if current plan contains a valid trajectory 255 return (self.
__plan is not None and len(self.
__plan.joint_trajectory.points) > 0)
259 Checks if given plan contains a valid trajectory 261 return (plan
is not None and len(plan.joint_trajectory.points) > 0)
265 Returns given plan quality calculated by a weighted sum of angles traveled by each 266 of the joints, giving higher weights to the joints closer to the base of the robot, 267 thus penalizing them as smallmovements of these joints will result in bigger movements 268 of the end effector. Formula: 270 PQ = sum_(i=0)^(n-1){w_i * abs(x_i - x_(i0)}, where: 272 n - number of robot's joints, 273 w - weight specified for each joint, 274 x - joint's goal position, 275 x_0 - joint's initial position. 277 The lower the value, the better the plan. 283 num_of_joints = len(plan.joint_trajectory.points[0].positions)
284 weights = numpy.array(sorted(range(1, num_of_joints + 1), reverse=
True))
285 plan_array = numpy.empty(shape=(len(plan.joint_trajectory.points),
288 for i, point
in enumerate(plan.joint_trajectory.points):
289 plan_array[i] = point.positions
291 deltas = abs(numpy.diff(plan_array, axis=0))
292 sum_deltas = numpy.sum(deltas, axis=0)
293 sum_deltas_weighted = sum_deltas * weights
294 plan_quality = float(numpy.sum(sum_deltas_weighted))
301 if plan_quality > medium_threshold:
302 rospy.logwarn(
"Low plan quality! Value: {}".format(plan_quality))
304 elif (plan_quality > good_threshold
and 305 plan_quality < medium_threshold):
306 rospy.loginfo(
"Medium plan quality. Value: {}".format(plan_quality))
308 elif plan_quality < good_threshold:
309 rospy.loginfo(
"Good plan quality. Value: {}".format(plan_quality))
320 Set a joint configuration by name. 321 @param name - name of the target which must correspond to a name defined, 322 either in the srdf or in the mongo warehouse database. 323 @return - bool to confirm that the target has been correctly set. 331 joints = response.state.joint_state.name
332 positions = response.state.joint_state.position
335 for n, this_name
in enumerate(joints):
336 if this_name
in active_names:
337 js[this_name] = positions[n]
340 except Exception
as e:
343 rospy.logerr(
"Unknown named state '%s'..." % name)
349 Get the joint angles for targets specified by name. 350 @param name - @param name - name of the target which must correspond to a name defined, 351 either in the srdf or in the mongo warehouse database. 352 @return - joint values of the named target. 363 for x, n
in enumerate(js.name):
365 output[n] = js.position[x]
368 rospy.logerr(
"No target named %s" % name)
379 Get the current pose of the end effector. 380 @param reference_frame - The desired reference frame in which end effector pose should be returned. 381 If none is passed, it will use the planning frame as reference. 382 @return - geometry_msgs.msg.Pose() - current pose of the end effector. 384 if reference_frame
is not None:
386 trans = self.
tf_buffer.lookup_transform(reference_frame,
390 current_pose = geometry_msgs.msg.Pose()
391 current_pose.position.x = trans.transform.translation.x
392 current_pose.position.y = trans.transform.translation.y
393 current_pose.position.z = trans.transform.translation.z
394 current_pose.orientation.x = trans.transform.rotation.x
395 current_pose.orientation.y = trans.transform.rotation.y
396 current_pose.orientation.z = trans.transform.rotation.z
397 current_pose.orientation.w = trans.transform.rotation.w
399 except (tf2_ros.LookupException, tf2_ros.ConnectivityException, tf2_ros.ExtrapolationException):
401 " in " + reference_frame +
" reference frame")
408 Get the current joint state of the group being used. 409 @return - a dictionary with the joint names as keys and current joint values. 414 return dict(zip(joint_names, joint_values))
418 Get the current joint state of the group being used, enforcing that they are within each joint limits. 419 @return - a dictionary with the joint names as keys and current joint values. 423 output = {n: current[n]
for n
in names
if n
in current}
432 Set target of the robot's links and moves to it 433 @param name - name of the target pose defined in SRDF 434 @param wait - should method wait for movement end or not 442 Set target of the robot's links and plans 443 This is a blocking method. 444 @param name - name of the target pose defined in SRDF. 445 @param custom_start_state - specify a start state different than the current state. 446 @return - a motion plan (RobotTrajectory msg) that contains the trajectory to the named target. 448 if custom_start_state
is None:
457 list_srv = rospy.ServiceProxy(
"list_robot_states", ListStates)
460 except rospy.ServiceException
as exc:
461 rospy.logwarn(
"Couldn't access warehouse: " + str(exc))
475 Get the complete list of named targets, from SRDF 476 as well as warehouse poses if available. 477 @return - list of strings containing names of targets. 483 Returns joints position. 484 @return - dictionary with joints positions. 491 Returns joints velocities 492 @return - dictionary with joints velocities. 499 Returns joints effort. 500 @return - dictionary with joints efforts. 508 @return - JointState message 515 Moves robot through all joint states with specified timeouts. 516 @param joint_trajectory - JointTrajectory class object. Represents 517 trajectory of the joints which would be executed. 519 plan = RobotTrajectory()
520 plan.joint_trajectory = joint_trajectory
525 Makes joint value trajectory from specified by named poses (either from 526 SRDF or from warehouse). 527 @param trajectory - list of waypoints, each waypoint is a dict with 528 the following elements (n.b either name or joint_angles is required) 529 - name -> the name of the way point 530 - joint_angles -> a dict of joint names and angles 531 - interpolate_time -> time to move from last wp 532 - pause_time -> time to wait at this wp 533 - degrees -> set to true if joint_angles is specified in degrees. Assumed false if absent. 537 joint_trajectory = JointTrajectory()
538 joint_names = current.keys()
539 joint_trajectory.joint_names = joint_names
541 start = JointTrajectoryPoint()
542 start.positions = current.values()
543 start.time_from_start = rospy.Duration.from_sec(0.001)
544 joint_trajectory.points.append(start)
546 time_from_start = 0.0
548 for wp
in trajectory:
550 joint_positions =
None 551 if 'name' in wp.keys():
553 elif 'joint_angles' in wp.keys():
554 joint_positions = copy.deepcopy(wp[
'joint_angles'])
555 if 'degrees' in wp.keys()
and wp[
'degrees']:
556 for joint, angle
in joint_positions.iteritems():
557 joint_positions[joint] = radians(angle)
559 if joint_positions
is None:
560 rospy.logerr(
"Invalid waypoint. Must contain valid name for named target or dict of joint angles.")
565 for n
in joint_names:
566 new_positions[n] = joint_positions[n]
if n
in joint_positions
else current[n]
568 trajectory_point = JointTrajectoryPoint()
569 trajectory_point.positions = [new_positions[n]
for n
in joint_names]
571 current = new_positions
573 time_from_start += wp[
'interpolate_time']
574 trajectory_point.time_from_start = rospy.Duration.from_sec(time_from_start)
575 joint_trajectory.points.append(trajectory_point)
577 if 'pause_time' in wp
and wp[
'pause_time'] > 0:
578 extra = JointTrajectoryPoint()
579 extra.positions = trajectory_point.positions
580 time_from_start += wp[
'pause_time']
581 extra.time_from_start = rospy.Duration.from_sec(time_from_start)
582 joint_trajectory.points.append(extra)
584 return joint_trajectory
588 Sends a trajectory of all active joints at their current position. 589 This stops the robot. 594 trajectory_point = JointTrajectoryPoint()
595 trajectory_point.positions = current.values()
596 trajectory_point.time_from_start = rospy.Duration.from_sec(0.1)
598 trajectory = JointTrajectory()
599 trajectory.points.append(trajectory_point)
600 trajectory.joint_names = current.keys()
606 Moves robot through trajectory specified by named poses, either from 607 SRDF or from warehouse. Runs trajectory directly via contoller. 608 @param trajectory - list of waypoints, each waypoint is a dict with 609 the following elements: 610 - name -> the name of the way point 611 - interpolate_time -> time to move from last wp 612 - pause_time -> time to wait at this wp 615 if joint_trajectory
is not None:
620 Moves robot through trajectory specified by named poses, either from 621 SRDF or from warehouse. Runs trajectory via moveit. 622 @param trajectory - list of waypoints, each waypoint is a dict with 623 the following elements: 624 - name -> the name of the way point 625 - interpolate_time -> time to move from last wp 626 - pause_time -> time to wait at this wp 629 if joint_trajectory
is not None:
634 Specify a target position for the end-effector and moves to it. 635 @param xyz - new position of end-effector. 636 @param end_effector_link - name of the end effector link. 637 @param wait - should method wait for movement end or not. 645 Specify a target position for the end-effector and plans. 646 This is a blocking method. 647 @param xyz - new position of end-effector. 648 @param end_effector_link - name of the end effector link. 649 @param custom_start_state - specify a start state different than the current state. 651 if custom_start_state
is None:
660 Specify a target pose for the end-effector and moves to it 661 @param pose - new pose of end-effector: a Pose message, a PoseStamped 662 message or a list of 6 floats: [x, y, z, rot_x, rot_y, rot_z] or a list 663 of 7 floats [x, y, z, qx, qy, qz, qw]. 664 @param end_effector_link - name of the end effector link. 665 @param wait - should method wait for movement end or not. 671 def plan_to_pose_target(self, pose, end_effector_link="", alternative_method=False, custom_start_state=None):
673 Specify a target pose for the end-effector and plans. 674 This is a blocking method. 675 @param pose - new pose of end-effector: a Pose message, a PoseStamped 676 message or a list of 6 floats: [x, y, z, rot_x, rot_y, rot_z] or a list 677 of 7 floats [x, y, z, qx, qy, qz, qw]. 678 @param end_effector_link - name of the end effector link. 679 @param alternative_method - use set_joint_value_target instead of set_pose_target. 680 @param custom_start_state - specify a start state different than the current state. 682 if custom_start_state
is None:
686 if alternative_method:
695 The callback function for the topic joint_states. 696 It will store the received joint position, velocity and efforts 697 information into dictionaries. 698 @param joint_state - the message containing the joints data. 703 zip(joint_state.name,
704 joint_state.position)}
706 zip(joint_state.name,
707 joint_state.velocity)}
709 zip(joint_state.name, joint_state.effort)}
713 Sets up an action client to communicate with the trajectory controller. 717 for controller_name
in controller_list.keys():
719 service_name = controller_name +
"/follow_joint_trajectory" 721 FollowJointTrajectoryAction)
722 if self.
_clients[controller_name].wait_for_server(timeout=rospy.Duration(4))
is False:
723 err_msg =
'Failed to connect to action server ({}) in 4 sec'.format(service_name)
724 rospy.logwarn(err_msg)
727 wait=True, angle_degrees=False):
729 Set target of the robot's links and moves to it. 730 @param joint_states - dictionary with joint name and value. It can 731 contain only joints values of which need to be changed. 732 @param time - time in s (counting from now) for the robot to reach the 733 target (it needs to be greater than 0.0 for it not to be rejected by 734 the trajectory controller). 735 @param wait - should method wait for movement end or not. 736 @param angle_degrees - are joint_states in degrees or not. 741 joint_states_cpy = copy.deepcopy(joint_states)
744 joint_states_cpy.update((joint, radians(i))
745 for joint, i
in joint_states_cpy.items())
749 goal = FollowJointTrajectoryGoal()
750 goal.trajectory.joint_names = []
751 point = JointTrajectoryPoint()
754 for x
in joint_states_cpy.keys():
755 if x
in controller_joints:
756 goal.trajectory.joint_names.append(x)
757 point.positions.append(joint_states_cpy[x])
759 point.time_from_start = rospy.Duration.from_sec(time)
761 goal.trajectory.points = [point]
763 goals[controller] = goal
771 if not self.
_clients[i].wait_for_result():
772 rospy.loginfo(
"Trajectory not completed")
775 if controller
is not None:
779 if controller_running:
790 goals[client],
lambda terminal_state, result: self.
_action_done_cb(client, terminal_state, result))
794 Moves robot through all joint states with specified timeouts. 795 @param joint_trajectory - JointTrajectory class object. Represents 796 trajectory of the joints which would be executed. 797 @param wait - should method wait for movement end or not. 802 goal = FollowJointTrajectoryGoal()
803 goal.trajectory = copy.deepcopy(joint_trajectory)
805 indices_of_joints_in_this_controller = []
807 for i, joint
in enumerate(joint_trajectory.joint_names):
808 if joint
in controller_joints:
809 indices_of_joints_in_this_controller.append(i)
811 goal.trajectory.joint_names = [
812 joint_trajectory.joint_names[i]
for i
in indices_of_joints_in_this_controller]
814 for point
in goal.trajectory.points:
816 point.positions = [point.positions[i]
for i
in indices_of_joints_in_this_controller]
818 point.velocities = [point.velocities[i]
for i
in indices_of_joints_in_this_controller]
820 point.effort = [point.effort[i]
for i
in indices_of_joints_in_this_controller]
822 goals[controller] = goal
830 if not self.
_clients[i].wait_for_result():
831 rospy.loginfo(
"Trajectory not completed")
834 eef_step=0.005, jump_threshold=0.0, custom_start_state=None):
836 Specify a set of waypoints for the end-effector and plans. 837 This is a blocking method. 838 @param reference_frame - the reference frame in which the waypoints are given. 839 @param waypoints - an array of poses of end-effector. 840 @param eef_step - configurations are computed for every eef_step meters. 841 @param jump_threshold - maximum distance in configuration space between consecutive points in the 843 @param custom_start_state - specify a start state different than the current state. 844 @return - motion plan (RobotTrajectory msg) that contains the trajectory to the set wayapoints targets. 846 if custom_start_state
is None:
851 if reference_frame
is not None:
855 return self.
__plan, fraction
859 Activates/deactivates the teach mode for the robot. 860 Activation: stops the the trajectory controllers for the robot, and 861 sets it to teach mode. 862 Deactivation: stops the teach mode and starts trajectory controllers 864 Currently this method blocks for a few seconds when called on a hand, 865 while the hand parameters are reloaded. 866 @param teach - bool to activate or deactivate teach mode 870 mode = RobotTeachModeRequest.TEACH_MODE
872 mode = RobotTeachModeRequest.TRAJECTORY_MODE
877 Make and execute a plan from the current state to the first state in an pre-existing trajectory. 878 @param trajectory - moveit_msgs/JointTrajectory. 879 @param wait - Bool to specify if movement should block untill finished. 882 if len(trajectory.points) <= 0:
883 rospy.logerr(
"Trajectory has no points in it, can't reverse...")
886 first_point = trajectory.points[0]
887 end_state = dict(zip(trajectory.joint_names, first_point.positions))
892 teach_mode_client = rospy.ServiceProxy(
'/teach_mode', RobotTeachMode)
894 req = RobotTeachModeRequest()
895 req.teach_mode = mode
898 resp = teach_mode_client(req)
899 if resp.result == RobotTeachModeResponse.ERROR:
900 rospy.logerr(
"Failed to change robot %s to mode %d", robot,
903 rospy.loginfo(
"Changed robot %s to mode %d Result = %d", robot,
905 except rospy.ServiceException:
906 rospy.logerr(
"Failed to call service teach_mode")
908 def get_ik(self, target_pose, avoid_collisions=False, joint_states=None, ik_constraints=None):
911 Computes the inverse kinematics for a given pose. It returns a JointState. 912 @param target_pose - A given pose of type PoseStamped. 913 @param avoid_collisions - Find an IK solution that avoids collisions. By default, this is false. 914 @param joint_states - initial joint configuration of type JointState from which the IK solution is computed. 915 If set to None, the current joint state is retrieved automatically. 916 @param ik_constraints - Set constraints of type Constraints for computing the IK solution. 918 service_request = PositionIKRequest()
919 service_request.group_name = self.
_name 921 service_request.pose_stamped = target_pose
922 service_request.timeout.secs = 0.5
923 service_request.avoid_collisions = avoid_collisions
924 if ik_constraints
is not None:
925 service_request.constraints = ik_constraints
926 if joint_states
is None:
929 service_request.robot_state.joint_state = joint_states
932 resp = self.
_compute_ik(ik_request=service_request)
934 if resp.error_code.val != 1:
935 if resp.error_code.val == -10:
936 rospy.logerr(
"Unreachable point: Start state in collision")
937 elif resp.error_code.val == -12:
938 rospy.logerr(
"Unreachable point: Goal state in collision")
939 elif resp.error_code.val == -31:
940 rospy.logerr(
"Unreachable point: No IK solution")
942 rospy.logerr(
"Unreachable point (error: %s)" % resp.error_code)
945 return resp.solution.joint_state
947 except rospy.ServiceException
as e:
948 rospy.logerr(
"Service call failed: %s" % e)
951 time=0.002, wait=True, ik_constraints=None):
953 Specify a target pose for the end-effector and moves to it. 954 @param target_pose - new pose of end-effector: a Pose message, a PoseStamped 955 message or a list of 6 floats: [x, y, z, rot_x, rot_y, rot_z] or a list 956 of 7 floats [x, y, z, qx, qy, qz, qw]. 957 @param avoid_collisions - Find an IK solution that avoids collisions. By default, this is false. 958 @param time - time in s (counting from now) for the robot to reach the 959 target (it needs to be greater than 0.0 for it not to be rejected by 960 the trajectory controller). 961 @param wait - should method wait for movement end or not. 962 @param ik_constraints - Set constraints of type Constraints for computing the IK solution. 964 joint_state = self.
get_ik(target_pose, avoid_collisions, ik_constraints=ik_constraints)
965 if joint_state
is not None:
967 current_indices = [i
for i, x
in enumerate(joint_state.name)
if any(thing
in x
for thing
in active_joints)]
968 current_names = [joint_state.name[i]
for i
in current_indices]
969 current_positions = [joint_state.position[i]
for i
in current_indices]
970 state_as_dict = dict(zip(current_names, current_positions))
def plan_to_named_target(self, name, custom_start_state=None)
def move_to_named_target(self, name, wait=True)
def get_end_effector_pose_from_named_state(self, name)
def change_teach_mode(mode, robot)
def get_planning_frame(self)
def move_to_pose_value_target_unsafe(self, target_pose, avoid_collisions=False, time=0.002, wait=True, ik_constraints=None)
def get_end_effector_link(self)
def refresh_named_targets(self)
def check_given_plan_is_valid(self, plan)
def get_named_target_joint_values(self, name)
def __init__(self, value)
def plan_to_pose_target(self, pose, end_effector_link="", alternative_method=False, custom_start_state=None)
def set_named_target(self, name)
def execute_plan(self, plan)
def get_ik(self, target_pose, avoid_collisions=False, joint_states=None, ik_constraints=None)
def run_named_trajectory(self, trajectory)
def run_joint_trajectory_unsafe(self, joint_trajectory, wait=True)
def set_teach_mode(self, teach)
def get_joints_velocity(self)
def get_current_state_bounded(self)
def plan_to_waypoints_target(self, waypoints, reference_frame=None, eef_step=0.005, jump_threshold=0.0, custom_start_state=None)
def _action_done_cb(self, controller, terminal_state, result)
def set_pose_reference_frame(self, reference_frame)
def set_max_acceleration_scaling_factor(self, value)
def get_robot_state_bounded(self)
def named_target_in_srdf(self, name)
def allow_looking(self, value)
def run_named_trajectory_unsafe(self, trajectory, wait=False)
def set_max_velocity_scaling_factor(self, value)
def make_named_trajectory(self, trajectory)
def move_to_trajectory_start(self, trajectory, wait=True)
def move_to_position_target(self, xyz, end_effector_link="", wait=True)
def _call_action(self, goals)
def evaluate_given_plan(self, plan)
def get_joints_position(self)
def get_current_state(self)
def set_planner_id(self, planner_id)
def set_num_planning_attempts(self, num_planning_attempts)
def move_to_pose_target(self, pose, end_effector_link="", wait=True)
def get_named_targets(self)
def run_joint_trajectory(self, joint_trajectory)
def _set_plan(self, plan)
def __get_srdf_names(self)
def _get_joints_effort(self)
def evaluate_plan_quality(self, plan_quality, good_threshold=20, medium_threshold=50)
def send_stop_trajectory_unsafe(self)
def move_to_joint_value_target(self, joint_states, wait=True, angle_degrees=False)
def allow_replanning(self, value)
def _set_up_action_client(self, controller_list)
def move_to_joint_value_target_unsafe(self, joint_states, time=0.002, wait=True, angle_degrees=False)
def __get_warehouse_names(self)
def plan_to_position_target(self, xyz, end_effector_link="", custom_start_state=None)
def set_planning_time(self, seconds)
def _joint_states_callback(self, joint_state)
def get_end_effector_pose_from_state(self, state)
def plan_to_joint_value_target(self, joint_states, angle_degrees=False, custom_start_state=None)
def action_is_running(self, controller=None)
def get_joints_state(self)
def get_current_pose(self, reference_frame=None)
def check_plan_is_valid(self)