sr_robot_commander.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Copyright 2015 Shadow Robot Company Ltd.
4 #
5 # This program is free software: you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by the Free
7 # Software Foundation version 2 of the License.
8 #
9 # This program is distributed in the hope that it will be useful, but WITHOUT
10 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 # more details.
13 #
14 # You should have received a copy of the GNU General Public License along
15 # with this program. If not, see <http://www.gnu.org/licenses/>.
16 
17 import threading
18 
19 import rospy
20 from actionlib import SimpleActionClient
21 from control_msgs.msg import FollowJointTrajectoryAction, \
22  FollowJointTrajectoryGoal
23 from moveit_commander import MoveGroupCommander, RobotCommander, \
24  PlanningSceneInterface
25 from moveit_msgs.msg import RobotTrajectory, PositionIKRequest
26 from sensor_msgs.msg import JointState
27 import geometry_msgs.msg
28 from sr_robot_msgs.srv import RobotTeachMode, RobotTeachModeRequest, \
29  RobotTeachModeResponse
30 
31 from moveit_msgs.srv import GetPositionIK
32 from moveit_msgs.srv import ListRobotStatesInWarehouse as ListStates
33 from moveit_msgs.srv import GetRobotStateFromWarehouse as GetState
34 from moveit_msgs.msg import OrientationConstraint, Constraints
35 
36 from trajectory_msgs.msg import JointTrajectoryPoint, JointTrajectory
37 from math import radians
38 
39 from moveit_msgs.srv import GetPositionFK
40 from std_msgs.msg import Header
41 
42 import tf2_ros
43 import copy
44 import numpy
45 
46 
47 class SrRobotCommanderException(Exception):
48 
49  def __init__(self, value):
50  self._value = value
51 
52  def __str__(self):
53  return repr(self._value)
54 
55 
56 class SrRobotCommander(object):
57 
58  """
59  Base class for hand and arm commanders.
60  """
61 
62  def __init__(self, name):
63  """
64  Initialize MoveGroupCommander object.
65  @param name - name of the MoveIt group.
66  """
67  self._name = name
68  self._move_group_commander = MoveGroupCommander(name)
69 
70  self._robot_commander = RobotCommander()
71 
72  self._robot_name = self._robot_commander._r.get_robot_name()
73 
75 
76  self._warehouse_name_get_srv = rospy.ServiceProxy("get_robot_state",
77  GetState)
78  self._planning_scene = PlanningSceneInterface()
79 
80  self._joint_states_lock = threading.Lock()
82  rospy.Subscriber("joint_states", JointState,
83  self._joint_states_callback, queue_size=1)
84  self._joints_position = {}
85  self._joints_velocity = {}
86  self._joints_effort = {}
87  self._joints_state = None
88  self._clients = {}
89  self.__plan = None
90 
91  self._controllers = {}
92 
93  rospy.wait_for_service('compute_ik')
94  self._compute_ik = rospy.ServiceProxy('compute_ik', GetPositionIK)
95  self._forward_k = rospy.ServiceProxy('compute_fk', GetPositionFK)
96 
97  controller_list_param = rospy.get_param("/move_group/controller_list")
98 
99  # create dictionary with name of controllers and corresponding joints
100  self._controllers = {item["name"]: item["joints"] for item in controller_list_param}
101 
103 
106 
107  threading.Thread(None, rospy.spin)
108 
109  def set_planner_id(self, planner_id):
110  """
111  Sets the planner_id used for all future planning requests.
112  @param planner_id - The string for the planner id, set to None to clear.
113  """
114  self._move_group_commander.set_planner_id(planner_id)
115 
116  def set_num_planning_attempts(self, num_planning_attempts):
117  self._move_group_commander.set_num_planning_attempts(num_planning_attempts)
118 
119  def set_planning_time(self, seconds):
120  """
121  Specifies the amount of time to be used for motion planning.
122  Some planning algorithms might require more time than others to compute
123  a valid solution.
124  """
126 
128  state = self._warehouse_name_get_srv(name, self._robot_name).state
129  return self.get_end_effector_pose_from_state(state)
130 
132  header = Header()
133  fk_link_names = [self._move_group_commander.get_end_effector_link()]
134  header.frame_id = self._move_group_commander.get_pose_reference_frame()
135  response = self._forward_k(header, fk_link_names, state)
136  return response.pose_stamped[0]
137 
139  """
140  @return - returns the name of the frame wrt which the motion planning
141  is computed.
142  """
144 
145  def set_pose_reference_frame(self, reference_frame):
146  """
147  Set the reference frame to assume for poses of end-effectors.
148  @param reference_frame - name of the frame.
149  """
150  self._move_group_commander.set_pose_reference_frame(reference_frame)
151 
152  def get_group_name(self):
153  return self._name
154 
158 
160  """
161  Set a scaling factor for optionally reducing the maximum joint velocity.
162  @param value - Allowed values are in (0,1].
163  """
165 
167  """
168  Set a scaling factor for optionally reducing the maximum joint accelaration.
169  @param value - Allowed values are in (0,1].
170  """
172 
173  def allow_looking(self, value):
174  """
175  Enable/disable looking around for motion planning.
176  @param value - boolean.
177  """
179 
180  def allow_replanning(self, value):
181  """
182  Enable/disable replanning in case new obstacles are detected
183  while executing a plan.
184  @param value - boolean.
185  """
187 
188  def execute(self):
189  """
190  Executes the last plan made.
191  """
192  if self.check_plan_is_valid():
194  self.__plan = None
195  else:
196  rospy.logwarn("No plans were made, not executing anything.")
197 
198  def execute_plan(self, plan):
199  """
200  Executes a given plan.
201  @param plan - RobotTrajectory msg that contains the trajectory
202  to the set goal state.
203  """
204  if self.check_given_plan_is_valid(plan):
205  self._move_group_commander.execute(plan)
206  self.__plan = None
207  else:
208  rospy.logwarn("Plan is not valid, not executing anything.")
209 
210  def move_to_joint_value_target(self, joint_states, wait=True,
211  angle_degrees=False):
212  """
213  Set target of the robot's links and moves to it.
214  @param joint_states - dictionary with joint name and value. It can
215  contain only joints values of which need to be changed.
216  @param wait - should method wait for movement end or not.
217  @param angle_degrees - are joint_states in degrees or not.
218  """
219  joint_states_cpy = copy.deepcopy(joint_states)
220 
221  if angle_degrees:
222  joint_states_cpy.update((joint, radians(i))
223  for joint, i in joint_states_cpy.items())
224  self._move_group_commander.set_start_state_to_current_state()
225  self._move_group_commander.set_joint_value_target(joint_states_cpy)
226  self._move_group_commander.go(wait=wait)
227 
228  def plan_to_joint_value_target(self, joint_states, angle_degrees=False, custom_start_state=None):
229  """
230  Set target of the robot's links and plans.
231  @param joint_states - dictionary with joint name and value. It can
232  contain only joints values of which need to be changed.
233  @param angle_degrees - are joint_states in degrees or not.
234  @param custom_start_state - specify a start state different than the current state
235  This is a blocking method.
236  @return - motion plan (RobotTrajectory msg) that contains the trajectory to the set goal state.
237  """
238  joint_states_cpy = copy.deepcopy(joint_states)
239 
240  if angle_degrees:
241  joint_states_cpy.update((joint, radians(i))
242  for joint, i in joint_states_cpy.items())
243  if custom_start_state is None:
244  self._move_group_commander.set_start_state_to_current_state()
245  else:
246  self._move_group_commander.set_start_state(custom_start_state)
247  self._move_group_commander.set_joint_value_target(joint_states_cpy)
248  self.__plan = self._move_group_commander.plan()
249  return self.__plan
250 
252  """
253  Checks if current plan contains a valid trajectory
254  """
255  return (self.__plan is not None and len(self.__plan.joint_trajectory.points) > 0)
256 
257  def check_given_plan_is_valid(self, plan):
258  """
259  Checks if given plan contains a valid trajectory
260  """
261  return (plan is not None and len(plan.joint_trajectory.points) > 0)
262 
263  def evaluate_given_plan(self, plan):
264  """
265  Returns given plan quality calculated by a weighted sum of angles traveled by each
266  of the joints, giving higher weights to the joints closer to the base of the robot,
267  thus penalizing them as smallmovements of these joints will result in bigger movements
268  of the end effector. Formula:
269 
270  PQ = sum_(i=0)^(n-1){w_i * abs(x_i - x_(i0)}, where:
271 
272  n - number of robot's joints,
273  w - weight specified for each joint,
274  x - joint's goal position,
275  x_0 - joint's initial position.
276 
277  The lower the value, the better the plan.
278  """
279 
280  if plan is None:
281  return None
282 
283  num_of_joints = len(plan.joint_trajectory.points[0].positions)
284  weights = numpy.array(sorted(range(1, num_of_joints + 1), reverse=True))
285  plan_array = numpy.empty(shape=(len(plan.joint_trajectory.points),
286  num_of_joints))
287 
288  for i, point in enumerate(plan.joint_trajectory.points):
289  plan_array[i] = point.positions
290 
291  deltas = abs(numpy.diff(plan_array, axis=0))
292  sum_deltas = numpy.sum(deltas, axis=0)
293  sum_deltas_weighted = sum_deltas * weights
294  plan_quality = float(numpy.sum(sum_deltas_weighted))
295  return plan_quality
296 
297  def evaluate_plan(self):
298  return self.evaluate_given_plan(self.__plan)
299 
300  def evaluate_plan_quality(self, plan_quality, good_threshold=20, medium_threshold=50):
301  if plan_quality > medium_threshold:
302  rospy.logwarn("Low plan quality! Value: {}".format(plan_quality))
303  return 'poor'
304  elif (plan_quality > good_threshold and
305  plan_quality < medium_threshold):
306  rospy.loginfo("Medium plan quality. Value: {}".format(plan_quality))
307  return 'medium'
308  elif plan_quality < good_threshold:
309  rospy.loginfo("Good plan quality. Value: {}".format(plan_quality))
310  return 'good'
311 
312  def get_robot_name(self):
313  return self._robot_name
314 
315  def named_target_in_srdf(self, name):
316  return name in self._srdf_names
317 
318  def set_named_target(self, name):
319  """
320  Set a joint configuration by name.
321  @param name - name of the target which must correspond to a name defined,
322  either in the srdf or in the mongo warehouse database.
323  @return - bool to confirm that the target has been correctly set.
324  """
325  if name in self._srdf_names:
327  elif (name in self._warehouse_names):
328  response = self._warehouse_name_get_srv(name, self._robot_name)
329 
330  active_names = self._move_group_commander._g.get_active_joints()
331  joints = response.state.joint_state.name
332  positions = response.state.joint_state.position
333  js = {}
334 
335  for n, this_name in enumerate(joints):
336  if this_name in active_names:
337  js[this_name] = positions[n]
338  try:
339  self._move_group_commander.set_joint_value_target(js)
340  except Exception as e:
341  rospy.loginfo(e)
342  else:
343  rospy.logerr("Unknown named state '%s'..." % name)
344  return False
345  return True
346 
348  """
349  Get the joint angles for targets specified by name.
350  @param name - @param name - name of the target which must correspond to a name defined,
351  either in the srdf or in the mongo warehouse database.
352  @return - joint values of the named target.
353  """
354  output = dict()
355 
356  if (name in self._srdf_names):
357  output = self._move_group_commander._g.get_named_target_values(str(name))
358 
359  elif (name in self._warehouse_names):
360  js = self._warehouse_name_get_srv(
361  name, self._robot_name).state.joint_state
362 
363  for x, n in enumerate(js.name):
364  if n in self._move_group_commander._g.get_joints():
365  output[n] = js.position[x]
366 
367  else:
368  rospy.logerr("No target named %s" % name)
369 
370  return None
371 
372  return output
373 
376 
377  def get_current_pose(self, reference_frame=None):
378  """
379  Get the current pose of the end effector.
380  @param reference_frame - The desired reference frame in which end effector pose should be returned.
381  If none is passed, it will use the planning frame as reference.
382  @return - geometry_msgs.msg.Pose() - current pose of the end effector.
383  """
384  if reference_frame is not None:
385  try:
386  trans = self.tf_buffer.lookup_transform(reference_frame,
388  rospy.Time(0),
389  rospy.Duration(5.0))
390  current_pose = geometry_msgs.msg.Pose()
391  current_pose.position.x = trans.transform.translation.x
392  current_pose.position.y = trans.transform.translation.y
393  current_pose.position.z = trans.transform.translation.z
394  current_pose.orientation.x = trans.transform.rotation.x
395  current_pose.orientation.y = trans.transform.rotation.y
396  current_pose.orientation.z = trans.transform.rotation.z
397  current_pose.orientation.w = trans.transform.rotation.w
398  return current_pose
399  except (tf2_ros.LookupException, tf2_ros.ConnectivityException, tf2_ros.ExtrapolationException):
400  rospy.logwarn("Couldn't get the pose from " + self._move_group_commander.get_end_effector_link() +
401  " in " + reference_frame + " reference frame")
402  return None
403  else:
404  return self._move_group_commander.get_current_pose().pose
405 
406  def get_current_state(self):
407  """
408  Get the current joint state of the group being used.
409  @return - a dictionary with the joint names as keys and current joint values.
410  """
411  joint_names = self._move_group_commander._g.get_active_joints()
412  joint_values = self._move_group_commander._g.get_current_joint_values()
413 
414  return dict(zip(joint_names, joint_values))
415 
417  """
418  Get the current joint state of the group being used, enforcing that they are within each joint limits.
419  @return - a dictionary with the joint names as keys and current joint values.
420  """
421  current = self._move_group_commander._g.get_current_state_bounded()
422  names = self._move_group_commander._g.get_active_joints()
423  output = {n: current[n] for n in names if n in current}
424 
425  return output
426 
428  return self._move_group_commander._g.get_current_state_bounded()
429 
430  def move_to_named_target(self, name, wait=True):
431  """
432  Set target of the robot's links and moves to it
433  @param name - name of the target pose defined in SRDF
434  @param wait - should method wait for movement end or not
435  """
436  self._move_group_commander.set_start_state_to_current_state()
437  if self.set_named_target(name):
438  self._move_group_commander.go(wait=wait)
439 
440  def plan_to_named_target(self, name, custom_start_state=None):
441  """
442  Set target of the robot's links and plans
443  This is a blocking method.
444  @param name - name of the target pose defined in SRDF.
445  @param custom_start_state - specify a start state different than the current state.
446  @return - a motion plan (RobotTrajectory msg) that contains the trajectory to the named target.
447  """
448  if custom_start_state is None:
449  self._move_group_commander.set_start_state_to_current_state()
450  else:
451  self._move_group_commander.set_start_state(custom_start_state)
452  if self.set_named_target(name):
453  self.__plan = self._move_group_commander.plan()
454 
456  try:
457  list_srv = rospy.ServiceProxy("list_robot_states", ListStates)
458  return list_srv("", self._robot_name).states
459 
460  except rospy.ServiceException as exc:
461  rospy.logwarn("Couldn't access warehouse: " + str(exc))
462  return list()
463 
464  def _reset_plan(self):
465  self.__plan = None
466 
467  def _set_plan(self, plan):
468  self.__plan = plan
469 
470  def __get_srdf_names(self):
471  return self._move_group_commander._g.get_named_targets()
472 
473  def get_named_targets(self):
474  """
475  Get the complete list of named targets, from SRDF
476  as well as warehouse poses if available.
477  @return - list of strings containing names of targets.
478  """
479  return self._srdf_names + self._warehouse_names
480 
482  """
483  Returns joints position.
484  @return - dictionary with joints positions.
485  """
486  with self._joint_states_lock:
487  return self._joints_position
488 
490  """
491  Returns joints velocities
492  @return - dictionary with joints velocities.
493  """
494  with self._joint_states_lock:
495  return self._joints_velocity
496 
498  """
499  Returns joints effort.
500  @return - dictionary with joints efforts.
501  """
502  with self._joint_states_lock:
503  return self._joints_effort
504 
505  def get_joints_state(self):
506  """
507  Returns joints state
508  @return - JointState message
509  """
510  with self._joint_states_lock:
511  return self._joints_state
512 
513  def run_joint_trajectory(self, joint_trajectory):
514  """
515  Moves robot through all joint states with specified timeouts.
516  @param joint_trajectory - JointTrajectory class object. Represents
517  trajectory of the joints which would be executed.
518  """
519  plan = RobotTrajectory()
520  plan.joint_trajectory = joint_trajectory
521  self._move_group_commander.execute(plan)
522 
523  def make_named_trajectory(self, trajectory):
524  """
525  Makes joint value trajectory from specified by named poses (either from
526  SRDF or from warehouse).
527  @param trajectory - list of waypoints, each waypoint is a dict with
528  the following elements (n.b either name or joint_angles is required)
529  - name -> the name of the way point
530  - joint_angles -> a dict of joint names and angles
531  - interpolate_time -> time to move from last wp
532  - pause_time -> time to wait at this wp
533  - degrees -> set to true if joint_angles is specified in degrees. Assumed false if absent.
534  """
535  current = self.get_current_state_bounded()
536 
537  joint_trajectory = JointTrajectory()
538  joint_names = current.keys()
539  joint_trajectory.joint_names = joint_names
540 
541  start = JointTrajectoryPoint()
542  start.positions = current.values()
543  start.time_from_start = rospy.Duration.from_sec(0.001)
544  joint_trajectory.points.append(start)
545 
546  time_from_start = 0.0
547 
548  for wp in trajectory:
549 
550  joint_positions = None
551  if 'name' in wp.keys():
552  joint_positions = self.get_named_target_joint_values(wp['name'])
553  elif 'joint_angles' in wp.keys():
554  joint_positions = copy.deepcopy(wp['joint_angles'])
555  if 'degrees' in wp.keys() and wp['degrees']:
556  for joint, angle in joint_positions.iteritems():
557  joint_positions[joint] = radians(angle)
558 
559  if joint_positions is None:
560  rospy.logerr("Invalid waypoint. Must contain valid name for named target or dict of joint angles.")
561  return None
562 
563  new_positions = {}
564 
565  for n in joint_names:
566  new_positions[n] = joint_positions[n] if n in joint_positions else current[n]
567 
568  trajectory_point = JointTrajectoryPoint()
569  trajectory_point.positions = [new_positions[n] for n in joint_names]
570 
571  current = new_positions
572 
573  time_from_start += wp['interpolate_time']
574  trajectory_point.time_from_start = rospy.Duration.from_sec(time_from_start)
575  joint_trajectory.points.append(trajectory_point)
576 
577  if 'pause_time' in wp and wp['pause_time'] > 0:
578  extra = JointTrajectoryPoint()
579  extra.positions = trajectory_point.positions
580  time_from_start += wp['pause_time']
581  extra.time_from_start = rospy.Duration.from_sec(time_from_start)
582  joint_trajectory.points.append(extra)
583 
584  return joint_trajectory
585 
587  """
588  Sends a trajectory of all active joints at their current position.
589  This stops the robot.
590  """
591 
592  current = self.get_current_state_bounded()
593 
594  trajectory_point = JointTrajectoryPoint()
595  trajectory_point.positions = current.values()
596  trajectory_point.time_from_start = rospy.Duration.from_sec(0.1)
597 
598  trajectory = JointTrajectory()
599  trajectory.points.append(trajectory_point)
600  trajectory.joint_names = current.keys()
601 
602  self.run_joint_trajectory_unsafe(trajectory)
603 
604  def run_named_trajectory_unsafe(self, trajectory, wait=False):
605  """
606  Moves robot through trajectory specified by named poses, either from
607  SRDF or from warehouse. Runs trajectory directly via contoller.
608  @param trajectory - list of waypoints, each waypoint is a dict with
609  the following elements:
610  - name -> the name of the way point
611  - interpolate_time -> time to move from last wp
612  - pause_time -> time to wait at this wp
613  """
614  joint_trajectory = self.make_named_trajectory(trajectory)
615  if joint_trajectory is not None:
616  self.run_joint_trajectory_unsafe(joint_trajectory, wait)
617 
618  def run_named_trajectory(self, trajectory):
619  """
620  Moves robot through trajectory specified by named poses, either from
621  SRDF or from warehouse. Runs trajectory via moveit.
622  @param trajectory - list of waypoints, each waypoint is a dict with
623  the following elements:
624  - name -> the name of the way point
625  - interpolate_time -> time to move from last wp
626  - pause_time -> time to wait at this wp
627  """
628  joint_trajectory = self.make_named_trajectory(trajectory)
629  if joint_trajectory is not None:
630  self.run_joint_trajectory(joint_trajectory)
631 
632  def move_to_position_target(self, xyz, end_effector_link="", wait=True):
633  """
634  Specify a target position for the end-effector and moves to it.
635  @param xyz - new position of end-effector.
636  @param end_effector_link - name of the end effector link.
637  @param wait - should method wait for movement end or not.
638  """
639  self._move_group_commander.set_start_state_to_current_state()
640  self._move_group_commander.set_position_target(xyz, end_effector_link)
641  self._move_group_commander.go(wait=wait)
642 
643  def plan_to_position_target(self, xyz, end_effector_link="", custom_start_state=None):
644  """
645  Specify a target position for the end-effector and plans.
646  This is a blocking method.
647  @param xyz - new position of end-effector.
648  @param end_effector_link - name of the end effector link.
649  @param custom_start_state - specify a start state different than the current state.
650  """
651  if custom_start_state is None:
652  self._move_group_commander.set_start_state_to_current_state()
653  else:
654  self._move_group_commander.set_start_state(custom_start_state)
655  self._move_group_commander.set_position_target(xyz, end_effector_link)
656  self.__plan = self._move_group_commander.plan()
657 
658  def move_to_pose_target(self, pose, end_effector_link="", wait=True):
659  """
660  Specify a target pose for the end-effector and moves to it
661  @param pose - new pose of end-effector: a Pose message, a PoseStamped
662  message or a list of 6 floats: [x, y, z, rot_x, rot_y, rot_z] or a list
663  of 7 floats [x, y, z, qx, qy, qz, qw].
664  @param end_effector_link - name of the end effector link.
665  @param wait - should method wait for movement end or not.
666  """
667  self._move_group_commander.set_start_state_to_current_state()
668  self._move_group_commander.set_pose_target(pose, end_effector_link)
669  self._move_group_commander.go(wait=wait)
670 
671  def plan_to_pose_target(self, pose, end_effector_link="", alternative_method=False, custom_start_state=None):
672  """
673  Specify a target pose for the end-effector and plans.
674  This is a blocking method.
675  @param pose - new pose of end-effector: a Pose message, a PoseStamped
676  message or a list of 6 floats: [x, y, z, rot_x, rot_y, rot_z] or a list
677  of 7 floats [x, y, z, qx, qy, qz, qw].
678  @param end_effector_link - name of the end effector link.
679  @param alternative_method - use set_joint_value_target instead of set_pose_target.
680  @param custom_start_state - specify a start state different than the current state.
681  """
682  if custom_start_state is None:
683  self._move_group_commander.set_start_state_to_current_state()
684  else:
685  self._move_group_commander.set_start_state(custom_start_state)
686  if alternative_method:
687  self._move_group_commander.set_joint_value_target(pose, end_effector_link)
688  else:
689  self._move_group_commander.set_pose_target(pose, end_effector_link)
690  self.__plan = self._move_group_commander.plan()
691  return self.__plan
692 
693  def _joint_states_callback(self, joint_state):
694  """
695  The callback function for the topic joint_states.
696  It will store the received joint position, velocity and efforts
697  information into dictionaries.
698  @param joint_state - the message containing the joints data.
699  """
700  with self._joint_states_lock:
701  self._joints_state = joint_state
702  self._joints_position = {n: p for n, p in
703  zip(joint_state.name,
704  joint_state.position)}
705  self._joints_velocity = {n: v for n, v in
706  zip(joint_state.name,
707  joint_state.velocity)}
708  self._joints_effort = {n: v for n, v in
709  zip(joint_state.name, joint_state.effort)}
710 
711  def _set_up_action_client(self, controller_list):
712  """
713  Sets up an action client to communicate with the trajectory controller.
714  """
715  self._action_running = {}
716 
717  for controller_name in controller_list.keys():
718  self._action_running[controller_name] = False
719  service_name = controller_name + "/follow_joint_trajectory"
720  self._clients[controller_name] = SimpleActionClient(service_name,
721  FollowJointTrajectoryAction)
722  if self._clients[controller_name].wait_for_server(timeout=rospy.Duration(4)) is False:
723  err_msg = 'Failed to connect to action server ({}) in 4 sec'.format(service_name)
724  rospy.logwarn(err_msg)
725 
726  def move_to_joint_value_target_unsafe(self, joint_states, time=0.002,
727  wait=True, angle_degrees=False):
728  """
729  Set target of the robot's links and moves to it.
730  @param joint_states - dictionary with joint name and value. It can
731  contain only joints values of which need to be changed.
732  @param time - time in s (counting from now) for the robot to reach the
733  target (it needs to be greater than 0.0 for it not to be rejected by
734  the trajectory controller).
735  @param wait - should method wait for movement end or not.
736  @param angle_degrees - are joint_states in degrees or not.
737  """
738  # self._update_default_trajectory()
739  # self._set_targets_to_default_trajectory(joint_states)
740  goals = {}
741  joint_states_cpy = copy.deepcopy(joint_states)
742 
743  if angle_degrees:
744  joint_states_cpy.update((joint, radians(i))
745  for joint, i in joint_states_cpy.items())
746 
747  for controller in self._controllers:
748  controller_joints = self._controllers[controller]
749  goal = FollowJointTrajectoryGoal()
750  goal.trajectory.joint_names = []
751  point = JointTrajectoryPoint()
752  point.positions = []
753 
754  for x in joint_states_cpy.keys():
755  if x in controller_joints:
756  goal.trajectory.joint_names.append(x)
757  point.positions.append(joint_states_cpy[x])
758 
759  point.time_from_start = rospy.Duration.from_sec(time)
760 
761  goal.trajectory.points = [point]
762 
763  goals[controller] = goal
764 
765  self._call_action(goals)
766 
767  if not wait:
768  return
769 
770  for i in self._clients.keys():
771  if not self._clients[i].wait_for_result():
772  rospy.loginfo("Trajectory not completed")
773 
774  def action_is_running(self, controller=None):
775  if controller is not None:
776  return self._action_running[controller]
777 
778  for controller_running in self._action_running.values():
779  if controller_running:
780  return True
781  return False
782 
783  def _action_done_cb(self, controller, terminal_state, result):
784  self._action_running[controller] = False
785 
786  def _call_action(self, goals):
787  for client in self._clients:
788  self._action_running[client] = True
789  self._clients[client].send_goal(
790  goals[client], lambda terminal_state, result: self._action_done_cb(client, terminal_state, result))
791 
792  def run_joint_trajectory_unsafe(self, joint_trajectory, wait=True):
793  """
794  Moves robot through all joint states with specified timeouts.
795  @param joint_trajectory - JointTrajectory class object. Represents
796  trajectory of the joints which would be executed.
797  @param wait - should method wait for movement end or not.
798  """
799  goals = {}
800  for controller in self._controllers:
801  controller_joints = self._controllers[controller]
802  goal = FollowJointTrajectoryGoal()
803  goal.trajectory = copy.deepcopy(joint_trajectory)
804 
805  indices_of_joints_in_this_controller = []
806 
807  for i, joint in enumerate(joint_trajectory.joint_names):
808  if joint in controller_joints:
809  indices_of_joints_in_this_controller.append(i)
810 
811  goal.trajectory.joint_names = [
812  joint_trajectory.joint_names[i] for i in indices_of_joints_in_this_controller]
813 
814  for point in goal.trajectory.points:
815  if point.positions:
816  point.positions = [point.positions[i] for i in indices_of_joints_in_this_controller]
817  if point.velocities:
818  point.velocities = [point.velocities[i] for i in indices_of_joints_in_this_controller]
819  if point.effort:
820  point.effort = [point.effort[i] for i in indices_of_joints_in_this_controller]
821 
822  goals[controller] = goal
823 
824  self._call_action(goals)
825 
826  if not wait:
827  return
828 
829  for i in self._clients.keys():
830  if not self._clients[i].wait_for_result():
831  rospy.loginfo("Trajectory not completed")
832 
833  def plan_to_waypoints_target(self, waypoints, reference_frame=None,
834  eef_step=0.005, jump_threshold=0.0, custom_start_state=None):
835  """
836  Specify a set of waypoints for the end-effector and plans.
837  This is a blocking method.
838  @param reference_frame - the reference frame in which the waypoints are given.
839  @param waypoints - an array of poses of end-effector.
840  @param eef_step - configurations are computed for every eef_step meters.
841  @param jump_threshold - maximum distance in configuration space between consecutive points in the
842  resulting path.
843  @param custom_start_state - specify a start state different than the current state.
844  @return - motion plan (RobotTrajectory msg) that contains the trajectory to the set wayapoints targets.
845  """
846  if custom_start_state is None:
847  self._move_group_commander.set_start_state_to_current_state()
848  else:
849  self._move_group_commander.set_start_state(custom_start_state)
850  old_frame = self._move_group_commander.get_pose_reference_frame()
851  if reference_frame is not None:
852  self.set_pose_reference_frame(reference_frame)
853  self.__plan, fraction = self._move_group_commander.compute_cartesian_path(waypoints, eef_step, jump_threshold)
854  self.set_pose_reference_frame(old_frame)
855  return self.__plan, fraction
856 
857  def set_teach_mode(self, teach):
858  """
859  Activates/deactivates the teach mode for the robot.
860  Activation: stops the the trajectory controllers for the robot, and
861  sets it to teach mode.
862  Deactivation: stops the teach mode and starts trajectory controllers
863  for the robot.
864  Currently this method blocks for a few seconds when called on a hand,
865  while the hand parameters are reloaded.
866  @param teach - bool to activate or deactivate teach mode
867  """
868 
869  if teach:
870  mode = RobotTeachModeRequest.TEACH_MODE
871  else:
872  mode = RobotTeachModeRequest.TRAJECTORY_MODE
873  self.change_teach_mode(mode, self._name)
874 
875  def move_to_trajectory_start(self, trajectory, wait=True):
876  """
877  Make and execute a plan from the current state to the first state in an pre-existing trajectory.
878  @param trajectory - moveit_msgs/JointTrajectory.
879  @param wait - Bool to specify if movement should block untill finished.
880  """
881 
882  if len(trajectory.points) <= 0:
883  rospy.logerr("Trajectory has no points in it, can't reverse...")
884  return None
885 
886  first_point = trajectory.points[0]
887  end_state = dict(zip(trajectory.joint_names, first_point.positions))
888  self.move_to_joint_value_target(end_state, wait=wait)
889 
890  @staticmethod
891  def change_teach_mode(mode, robot):
892  teach_mode_client = rospy.ServiceProxy('/teach_mode', RobotTeachMode)
893 
894  req = RobotTeachModeRequest()
895  req.teach_mode = mode
896  req.robot = robot
897  try:
898  resp = teach_mode_client(req)
899  if resp.result == RobotTeachModeResponse.ERROR:
900  rospy.logerr("Failed to change robot %s to mode %d", robot,
901  mode)
902  else:
903  rospy.loginfo("Changed robot %s to mode %d Result = %d", robot,
904  mode, resp.result)
905  except rospy.ServiceException:
906  rospy.logerr("Failed to call service teach_mode")
907 
908  def get_ik(self, target_pose, avoid_collisions=False, joint_states=None, ik_constraints=None):
909 
910  """
911  Computes the inverse kinematics for a given pose. It returns a JointState.
912  @param target_pose - A given pose of type PoseStamped.
913  @param avoid_collisions - Find an IK solution that avoids collisions. By default, this is false.
914  @param joint_states - initial joint configuration of type JointState from which the IK solution is computed.
915  If set to None, the current joint state is retrieved automatically.
916  @param ik_constraints - Set constraints of type Constraints for computing the IK solution.
917  """
918  service_request = PositionIKRequest()
919  service_request.group_name = self._name
920  service_request.ik_link_name = self._move_group_commander.get_end_effector_link()
921  service_request.pose_stamped = target_pose
922  service_request.timeout.secs = 0.5
923  service_request.avoid_collisions = avoid_collisions
924  if ik_constraints is not None:
925  service_request.constraints = ik_constraints
926  if joint_states is None:
927  service_request.robot_state.joint_state = self.get_joints_state()
928  else:
929  service_request.robot_state.joint_state = joint_states
930 
931  try:
932  resp = self._compute_ik(ik_request=service_request)
933  # Check if error_code.val is SUCCESS=1
934  if resp.error_code.val != 1:
935  if resp.error_code.val == -10:
936  rospy.logerr("Unreachable point: Start state in collision")
937  elif resp.error_code.val == -12:
938  rospy.logerr("Unreachable point: Goal state in collision")
939  elif resp.error_code.val == -31:
940  rospy.logerr("Unreachable point: No IK solution")
941  else:
942  rospy.logerr("Unreachable point (error: %s)" % resp.error_code)
943  return
944  else:
945  return resp.solution.joint_state
946 
947  except rospy.ServiceException as e:
948  rospy.logerr("Service call failed: %s" % e)
949 
950  def move_to_pose_value_target_unsafe(self, target_pose, avoid_collisions=False,
951  time=0.002, wait=True, ik_constraints=None):
952  """
953  Specify a target pose for the end-effector and moves to it.
954  @param target_pose - new pose of end-effector: a Pose message, a PoseStamped
955  message or a list of 6 floats: [x, y, z, rot_x, rot_y, rot_z] or a list
956  of 7 floats [x, y, z, qx, qy, qz, qw].
957  @param avoid_collisions - Find an IK solution that avoids collisions. By default, this is false.
958  @param time - time in s (counting from now) for the robot to reach the
959  target (it needs to be greater than 0.0 for it not to be rejected by
960  the trajectory controller).
961  @param wait - should method wait for movement end or not.
962  @param ik_constraints - Set constraints of type Constraints for computing the IK solution.
963  """
964  joint_state = self.get_ik(target_pose, avoid_collisions, ik_constraints=ik_constraints)
965  if joint_state is not None:
966  active_joints = self._move_group_commander.get_active_joints()
967  current_indices = [i for i, x in enumerate(joint_state.name) if any(thing in x for thing in active_joints)]
968  current_names = [joint_state.name[i] for i in current_indices]
969  current_positions = [joint_state.position[i] for i in current_indices]
970  state_as_dict = dict(zip(current_names, current_positions))
971  self.move_to_joint_value_target_unsafe(state_as_dict, time=time, wait=wait)
def plan_to_named_target(self, name, custom_start_state=None)
def move_to_pose_value_target_unsafe(self, target_pose, avoid_collisions=False, time=0.002, wait=True, ik_constraints=None)
def plan_to_pose_target(self, pose, end_effector_link="", alternative_method=False, custom_start_state=None)
def get_ik(self, target_pose, avoid_collisions=False, joint_states=None, ik_constraints=None)
def run_joint_trajectory_unsafe(self, joint_trajectory, wait=True)
def plan_to_waypoints_target(self, waypoints, reference_frame=None, eef_step=0.005, jump_threshold=0.0, custom_start_state=None)
def _action_done_cb(self, controller, terminal_state, result)
def run_named_trajectory_unsafe(self, trajectory, wait=False)
def move_to_position_target(self, xyz, end_effector_link="", wait=True)
def set_num_planning_attempts(self, num_planning_attempts)
def move_to_pose_target(self, pose, end_effector_link="", wait=True)
def evaluate_plan_quality(self, plan_quality, good_threshold=20, medium_threshold=50)
def move_to_joint_value_target(self, joint_states, wait=True, angle_degrees=False)
def move_to_joint_value_target_unsafe(self, joint_states, time=0.002, wait=True, angle_degrees=False)
def plan_to_position_target(self, xyz, end_effector_link="", custom_start_state=None)
def plan_to_joint_value_target(self, joint_states, angle_degrees=False, custom_start_state=None)


sr_robot_commander
Author(s): Andriy Petlovanyy
autogenerated on Mon Feb 28 2022 23:52:25