follow_warehouse_trajectory.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Copyright 2019 Shadow Robot Company Ltd.
3 #
4 # This program is free software: you can redistribute it and/or modify it
5 # under the terms of the GNU General Public License as published by the Free
6 # Software Foundation version 2 of the License.
7 #
8 # This program is distributed in the hope that it will be useful, but WITHOUT
9 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 # more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program. If not, see <http://www.gnu.org/licenses/>.
15 
16 import rospy
17 
18 from moveit_commander import RobotCommander
19 from moveit_commander import PlanningSceneInterface
20 from moveit_commander import MoveGroupCommander
21 from moveit_msgs.srv import ListRobotStatesInWarehouse as ListStates
22 from moveit_msgs.srv import GetRobotStateFromWarehouse as GetState
23 from moveit_msgs.srv import CheckIfRobotStateExistsInWarehouse as HasState
24 from moveit_msgs.srv import GetPositionFK
25 from sr_robot_msgs.srv import PlanTrajectoryFromList
26 from sr_robot_msgs.srv import PlanTrajectoryFromPrefix
27 from sr_robot_msgs.srv import ExecutePlannedTrajectory
28 
29 from std_msgs.msg import Header
30 import geometry_msgs.msg
31 
32 
33 class WarehousePlanner(object):
34  def __init__(self):
35  rospy.init_node('moveit_warehouse_planner', anonymous=True)
36  self.scene = PlanningSceneInterface()
37  self.robot = RobotCommander()
38 
39  rospy.sleep(2)
40  group_id = rospy.get_param("~arm_group_name")
41  self.eef_step = rospy.get_param("~eef_step", 0.01)
42  self.jump_threshold = rospy.get_param("~jump_threshold", 1000)
43 
44  self.group = MoveGroupCommander(group_id)
45  # self._add_ground()
46  self._robot_name = self.robot._r.get_robot_name()
47  self._robot_link = self.group.get_end_effector_link()
48  self._robot_frame = self.group.get_pose_reference_frame()
49 
50  self._min_wp_fraction = rospy.get_param("~min_waypoint_fraction", 0.9)
51 
52  rospy.sleep(4)
53  rospy.loginfo("Waiting for warehouse services...")
54  rospy.wait_for_service('/list_robot_states')
55  rospy.wait_for_service('/get_robot_state')
56  rospy.wait_for_service('/has_robot_state')
57 
58  rospy.wait_for_service('/compute_fk')
59  self._list_states = rospy.ServiceProxy(
60  '/list_robot_states', ListStates)
61  self._get_state = rospy.ServiceProxy(
62  '/get_robot_state', GetState)
63  self._has_state = rospy.ServiceProxy(
64  '/has_robot_state', HasState)
65  self._forward_k = rospy.ServiceProxy(
66  'compute_fk', GetPositionFK)
67  rospy.loginfo("Service proxies connected")
68 
69  self._tr_frm_list_srv = rospy.Service('plan_trajectory_from_list',
70  PlanTrajectoryFromList,
71  self._plan_from_list_cb)
72 
73  self._tr_frm_prfx_srv = rospy.Service('plan_trajectory_from_prefix',
74  PlanTrajectoryFromPrefix,
76 
77  self._execute_tr_srv = rospy.Service('execute_planned_trajectory',
78  ExecutePlannedTrajectory,
79  self._execute_plan_cb)
80 
81  self.__plan = None
82 
83  def get_waypoint_names_by_prefix(self, prefix):
84  regex = "^" + str(prefix) + ".*"
85  waypoint_names = self._list_states(regex, self._robot_name).states
86  return waypoint_names
87 
88  def get_pose_from_state(self, state):
89  header = Header()
90  fk_link_names = [self._robot_link]
91  header.frame_id = self._robot_frame
92  response = self._forward_k(header, fk_link_names, state)
93  return response.pose_stamped[0].pose
94 
95  def get_cartesian_waypoints(self, waypoint_names):
96  waypoints = []
97  waypoints.append(self.group.get_current_pose().pose)
98  for name in waypoint_names:
99  if self._has_state(name, self._robot_name).exists:
100  robot_state = self._get_state(name, "").state
101  waypoints.append(self.get_pose_from_state(robot_state))
102  else:
103  rospy.logerr("Waypoint %s doesn't exist for robot %s.", name,
104  self._robot_name)
105  return waypoints
106 
107  def _add_ground(self):
108  p = geometry_msgs.msg.PoseStamped()
109  p.header.frame_id = self.robot.get_planning_frame()
110 
111  p.pose.position.x = 0
112  p.pose.position.y = 0
113  # offset such that the box is below the dome
114  p.pose.position.z = -0.11
115  p.pose.orientation.x = 0
116  p.pose.orientation.y = 0
117  p.pose.orientation.z = 0
118  p.pose.orientation.w = 1
119  self.scene.add_box("ground", p, (3, 3, 0.01))
120  rospy.sleep(1)
121 
122  def plan_from_filter(self, prefix):
123  waypoint_names = self.get_waypoint_names_by_prefix(prefix)
124  waypoint_names.sort()
125 
126  if 0 == len(waypoint_names):
127  rospy.logerr("No waypoints found for robot %s with prefix %s. " +
128  "Can't make trajectory :(",
129  self._robot_name, str(prefix))
130  return False
131  rospy.loginfo("Creating trajectory with %d waypoints selected by " +
132  "prefix %s.", len(waypoint_names), str(prefix))
133  return self.plan_from_list(waypoint_names)
134 
135  def plan_from_list(self, waypoint_names):
136  self.group.clear_pose_targets()
137  waypoints = self.get_cartesian_waypoints(waypoint_names)
138  if len(waypoints) != len(waypoint_names) + 1:
139  # +1 because current position is used as first waypiont.
140  rospy.logerr("Not all waypoints existed, not executing.")
141  return False
142  (plan, fraction) = self.group.compute_cartesian_path(
143  waypoints, self.eef_step, self.jump_threshold)
144 
145  if fraction < self._min_wp_fraction:
146  rospy.logerr("Only managed to generate trajectory through" +
147  "%f of waypoints. Not executing", fraction)
148  return False
149 
150  self.__plan = plan
151  return True
152 
153  def _plan_from_list_cb(self, request):
154  # check all exist
155  self.__plan = None
156  rospy.loginfo("Creating trajectory from points given: %s",
157  ",".join(request.waypoint_names))
158  return self.plan_from_list(request.waypoint_names)
159 
160  def _plan_from_prefix_cb(self, request):
161  self.__plan = None
162  rospy.loginfo("Creating trajectory from points filtered by prefix %s",
163  request.prefix)
164  return self.plan_from_filter(request.prefix)
165 
166  def _execute_plan_cb(self, request):
167  if self.__plan is None:
168  rospy.logerr("No plan stored!")
169  return False
170  rospy.loginfo("Executing stored plan")
171  response = self.group.execute(self.__plan)
172  self.__plan = None
173  return response
174 
175 if __name__ == "__main__":
177  rospy.spin()


sr_robot_commander
Author(s): Andriy Petlovanyy
autogenerated on Wed Oct 14 2020 04:05:30