planning_scene_interface.py
Go to the documentation of this file.
1 # Copyright 2011-2021, Michael Ferguson
2 # All rights reserved.
3 #
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions
6 # are met:
7 #
8 # * Redistributions of source code must retain the above copyright
9 # notice, this list of conditions and the following disclaimer.
10 # * Redistributions in binary form must reproduce the above
11 # copyright notice, this list of conditions and the following
12 # disclaimer in the documentation and/or other materials provided
13 # with the distribution.
14 #
15 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
16 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
17 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
18 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
19 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
20 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
21 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
23 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
25 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 # POSSIBILITY OF SUCH DAMAGE.
27 
28 import threading, copy
29 import rospy
30 
31 try:
32  from pyassimp import pyassimp
33  use_pyassimp = True
34 except:
35  # support pyassimp > 3.0
36  try:
37  import pyassimp
38  use_pyassimp = True
39  except:
40  # In 16.04, pyassimp is busted
41  # https://bugs.launchpad.net/ubuntu/+source/assimp/+bug/1589949
42  use_pyassimp = False
43 
44 try:
45  basestring
46 except NameError:
47  basestring = str
48 
49 from geometry_msgs.msg import Pose, PoseStamped, Point
50 from moveit_msgs.msg import CollisionObject, AttachedCollisionObject
51 from moveit_msgs.msg import PlanningScene, PlanningSceneComponents, ObjectColor
52 from moveit_msgs.srv import GetPlanningScene, ApplyPlanningScene
53 from shape_msgs.msg import MeshTriangle, Mesh, SolidPrimitive, Plane
54 
55 ## @brief A class for managing the state of the planning scene
56 ## @param frame The fixed frame in which planning is being done (needs to be part of robot?)
57 ## @param ns A namespace to push all topics down into.
58 ## @param init_from_service Whether to initialize our list of objects by calling the service
59 ## NOTE: this requires that said service be in the move_group launch file, which
60 ## is not the default from the setup assistant.
61 class PlanningSceneInterface(object):
62  def __init__(self, frame, ns='', init_from_service=True):
63  # ns must be a string
64  if not isinstance(ns, basestring):
65  rospy.logerr('Namespace must be a string!')
66  ns = ''
67  elif not ns.endswith('/'):
68  ns += '/'
69 
70  self._fixed_frame = frame
71 
72  self._scene_pub = rospy.Publisher(ns + 'planning_scene',
73  PlanningScene,
74  queue_size=10)
75  self._apply_service = rospy.ServiceProxy(ns + 'apply_planning_scene', ApplyPlanningScene)
76  # track the attached and collision objects
77  self._mutex = threading.Lock()
78  # these are updated based what the planning scene actually contains
79  self._attached = list()
80  self._collision = list()
81  # these are updated based on internal state
82  self._objects = dict()
83  self._attached_objects = dict()
84  self._removed = dict()
85  self._attached_removed = dict()
86  self._colors = dict()
87 
88  # get the initial planning scene
89  if init_from_service:
90  rospy.loginfo('Waiting for get_planning_scene')
91  rospy.wait_for_service(ns + 'get_planning_scene')
92  self._service = rospy.ServiceProxy(ns + 'get_planning_scene',
93  GetPlanningScene)
94  try:
95  req = PlanningSceneComponents()
96  req.components = sum([
97  PlanningSceneComponents.WORLD_OBJECT_NAMES,
98  PlanningSceneComponents.WORLD_OBJECT_GEOMETRY,
99  PlanningSceneComponents.ROBOT_STATE_ATTACHED_OBJECTS])
100  scene = self._service(req)
101  self.sceneCb(scene.scene, initial=True)
102  except rospy.ServiceException as e:
103  rospy.logerr('Failed to get initial planning scene, results may be wonky: %s', e)
104 
105  # subscribe to planning scene
106  rospy.Subscriber(ns + 'move_group/monitored_planning_scene',
107  PlanningScene,
108  self.sceneCb)
109 
110  ## @brief Send new update to planning scene
111  ## @param collision_object A collision object to add to scene, or None
112  ## @param attached_collision_object Attached collision object to add to scene, or None
113  ## @param use_service If true, update will be sent via apply service, otherwise by topic
114  def sendUpdate(self, collision_object, attached_collision_object, use_service=True):
115  ps = PlanningScene()
116  ps.is_diff = True
117  ps.robot_state.is_diff = True
118  if collision_object:
119  ps.world.collision_objects.append(collision_object)
120 
121  if attached_collision_object:
122  ps.robot_state.attached_collision_objects.append(attached_collision_object)
123 
124  if use_service:
125  resp = self._apply_service.call(ps)
126  if not resp.success:
127  rospy.logerr("Could not apply planning scene diff.")
128  else:
129  self._scene_pub.publish(ps)
130 
131  ## @brief Clear the planning scene of all objects
132  def clear(self):
133  for name in self.getKnownCollisionObjects():
134  self.removeCollisionObject(name, True)
135  for name in self.getKnownAttachedObjects():
136  self.removeAttachedObject(name, True)
137  self.waitForSync()
138 
139  ## @brief Make a mesh collision object
140  ## @param name Name of the object
141  ## @param pose A geometry_msgs/Pose for the object
142  ## @param filename The mesh file to load
143  def makeMesh(self, name, pose, filename):
144  if not use_pyassimp:
145  rospy.logerr('pyassimp is broken on your platform, cannot load meshes')
146  return
147  scene = pyassimp.load(filename)
148  if not scene.meshes:
149  rospy.logerr('Unable to load mesh')
150  return
151 
152  mesh = Mesh()
153  for face in scene.meshes[0].faces:
154  triangle = MeshTriangle()
155  if hasattr(face, 'indices'):
156  if len(face.indices) == 3:
157  triangle.vertex_indices = [face.indices[0],
158  face.indices[1],
159  face.indices[2]]
160  else:
161  if len(face) == 3:
162  triangle.vertex_indices = [face[0],
163  face[1],
164  face[2]]
165  mesh.triangles.append(triangle)
166  for vertex in scene.meshes[0].vertices:
167  point = Point()
168  point.x = vertex[0]
169  point.y = vertex[1]
170  point.z = vertex[2]
171  mesh.vertices.append(point)
172  pyassimp.release(scene)
173 
174  o = CollisionObject()
175  o.header.stamp = rospy.Time.now()
176  o.header.frame_id = self._fixed_frame
177  o.id = name
178  o.meshes.append(mesh)
179  o.mesh_poses.append(pose)
180  o.operation = o.ADD
181  return o
182 
183  ## @brief Make a solid primitive collision object
184  ## @param name Name of the object
185  ## @param solid The solid primitive to add
186  ## @param pose A geometry_msgs/Pose for the object
187  ## @param frame_id Optional frame for the pose, otherwise fixed_frame is used
188  def makeSolidPrimitive(self, name, solid, pose, frame_id=None):
189  o = CollisionObject()
190  o.header.stamp = rospy.Time.now()
191  if frame_id:
192  o.header.frame_id = frame_id
193  else:
194  o.header.frame_id = self._fixed_frame
195  o.id = name
196  o.primitives.append(solid)
197  o.primitive_poses.append(pose)
198  o.operation = o.ADD
199  return o
200 
201  ## @brief Make an attachedCollisionObject
202  def makeAttached(self, link_name, obj, touch_links, detach_posture,
203  weight):
204  o = AttachedCollisionObject()
205  o.link_name = link_name
206  o.object = obj
207  if touch_links:
208  o.touch_links = touch_links
209  if detach_posture:
210  o.detach_posture = detach_posture
211  o.weight = weight
212  return o
213 
214  ## @brief Insert a mesh into the planning scene
215  ## @param name Name of the object
216  ## @param pose A geometry_msgs/Pose for the object
217  ## @param filename The mesh file to load
218  ## @param use_service If true, update will be sent via apply service
219  def addMesh(self, name, pose, filename, use_service=True):
220  o = self.makeMesh(name, pose, filename)
221  self._objects[name] = o
222  self.sendUpdate(o, None, use_service)
223 
224  ## @brief Attach a mesh into the planning scene
225  ## @param name Name of the object
226  ## @param pose A geometry_msgs/Pose for the object
227  ## @param filename The mesh file to load
228  ## @param use_service If true, update will be sent via apply service
229  def attachMesh(self, name, pose, filename, link_name, touch_links=None,
230  detach_posture=None, weight=0.0, use_service=True):
231  o = self.makeMesh(name, pose, filename)
232  o.header.frame_id = link_name
233  a = self.makeAttached(link_name, o, touch_links, detach_posture,
234  weight)
235  self._attached_objects[name] = a
236  self.sendUpdate(None, a, use_service)
237 
238  ## @brief Insert a solid primitive into planning scene
239  ## @param name The name of the object
240  ## @param solid An instance of shape_msgs/SolidPrimitive to add
241  ## @param pose A geometry_msgs/Pose for the object
242  ## @param use_service If true, update will be sent via apply service
243  ## @param frame_id Optional frame for the pose, otherwise fixed_frame is used
244  def addSolidPrimitive(self, name, solid, pose, use_service=True, frame_id=None):
245  o = self.makeSolidPrimitive(name, solid, pose, frame_id)
246  self._objects[name] = o
247  self.sendUpdate(o, None, use_service)
248 
249  ## @brief Insert new cylinder into planning scene
250  ## @param height The height of the cylinder
251  ## @param radius The radius of the cylinder
252  ## @param x The x position in fixed frame
253  ## @param y The y position in fixed frame
254  ## @param z The z position in fixed frame
255  ## @param use_service If true, update will be sent via apply service
256  ## @param frame_id Optional frame for the pose, otherwise fixed_frame is used
257  def addCylinder(self, name, height, radius, x, y, z, use_service=True, frame_id=None):
258  s = SolidPrimitive()
259  s.dimensions = [height, radius]
260  s.type = s.CYLINDER
261 
262  ps = PoseStamped()
263  ps.header.frame_id = self._fixed_frame
264  ps.pose.position.x = x
265  ps.pose.position.y = y
266  ps.pose.position.z = z
267  ps.pose.orientation.w = 1.0
268 
269  self.addSolidPrimitive(name, s, ps.pose, use_service, frame_id)
270 
271  ## @brief Insert new sphere into planning scene
272  ## @param radius The radius of the sphere
273  ## @param x The x position in fixed frame
274  ## @param y The y position in fixed frame
275  ## @param z The z position in fixed frame
276  ## @param use_service If true, update will be sent via apply service
277  ## @param frame_id Optional frame for the pose, otherwise fixed_frame is used
278  def addSphere(self, name, radius, x, y, z, use_service=True, frame_id=None):
279  s = SolidPrimitive()
280  s.dimensions = [radius]
281  s.type = s.SPHERE
282 
283  ps = PoseStamped()
284  ps.header.frame_id = self._fixed_frame
285  ps.pose.position.x = x
286  ps.pose.position.y = y
287  ps.pose.position.z = z
288  ps.pose.orientation.w = 1.0
289 
290  self.addSolidPrimitive(name, s, ps.pose, use_service, frame_id)
291 
292  ## @brief Insert new box into planning scene
293  ## @param name Name of the object
294  ## @param size_x The x-dimensions size of the box
295  ## @param size_y The y-dimensions size of the box
296  ## @param size_z The z-dimensions size of the box
297  ## @param x The x position in fixed frame
298  ## @param y The y position in fixed frame
299  ## @param z The z position in fixed frame
300  ## @param use_service If true, update will be sent via apply service
301  ## @param frame_id Optional frame for the pose, otherwise fixed_frame is used
302  def addBox(self, name, size_x, size_y, size_z, x, y, z, use_service=True, frame_id=None):
303  s = SolidPrimitive()
304  s.dimensions = [size_x, size_y, size_z]
305  s.type = s.BOX
306 
307  ps = PoseStamped()
308  ps.header.frame_id = self._fixed_frame
309  ps.pose.position.x = x
310  ps.pose.position.y = y
311  ps.pose.position.z = z
312  ps.pose.orientation.w = 1.0
313 
314  self.addSolidPrimitive(name, s, ps.pose, use_service)
315 
316  ## @brief Attach a box into the planning scene
317  ## @param name Name of the object
318  ## @param size_x The x-dimensions size of the box
319  ## @param size_y The y-dimensions size of the box
320  ## @param size_z The z-dimensions size of the box
321  ## @param x The x position in link_name frame
322  ## @param y The y position in link_name frame
323  ## @param z The z position in link_name frame
324  ## @param link_name Name of link to attach this object to
325  ## @param touch_links Names of robot links that can touch this object
326  ## @param use_service If true, update will be sent via apply service
327  def attachBox(self, name, size_x, size_y, size_z, x, y, z, link_name,
328  touch_links=None, detach_posture=None, weight=0.0,
329  use_service=True):
330  s = SolidPrimitive()
331  s.dimensions = [size_x, size_y, size_z]
332  s.type = s.BOX
333 
334  p = Pose()
335  p.position.x = x
336  p.position.y = y
337  p.position.z = z
338  p.orientation.w = 1.0
339  o = self.makeSolidPrimitive(name, s, p)
340  o.header.frame_id = link_name
341  a = self.makeAttached(link_name, o, touch_links, detach_posture, weight)
342  self._attached_objects[name] = a
343  self.sendUpdate(None, a, use_service)
344 
345  ## @brief Insert new cube to planning scene
346  ## @param use_service If true, update will be sent via apply service
347  ## @param frame_id Optional frame for the pose, otherwise fixed_frame is used
348  def addCube(self, name, size, x, y, z, use_service=True, frame_id=None):
349  self.addBox(name, size, size, size, x, y, z, use_service, frame_id)
350 
351  ## @brief Send message to remove object
352  ## @param use_service If true, update will be sent via apply service
353  def removeCollisionObject(self, name, use_service=True):
354  """ Remove an object. """
355  o = CollisionObject()
356  o.header.stamp = rospy.Time.now()
357  o.header.frame_id = self._fixed_frame
358  o.id = name
359  o.operation = o.REMOVE
360 
361  try:
362  del self._objects[name]
363  self._removed[name] = o
364  except KeyError:
365  pass
366 
367  self.sendUpdate(o, None, use_service)
368 
369  ## @brief Send message to remove object
370  ## @param use_service If true, update will be sent via apply service
371  def removeAttachedObject(self, name, use_service=True):
372  """ Remove an attached object. """
373  o = AttachedCollisionObject()
374  o.object.operation = CollisionObject.REMOVE
375  o.object.id = name
376 
377  try:
378  del self._attached_objects[name]
379  self._attached_removed[name] = o
380  except KeyError:
381  pass
382 
383  self.sendUpdate(None, o, use_service)
384 
385  ## @brief Update the object lists from a PlanningScene message
386  def sceneCb(self, msg, initial=False):
387  """ Recieve updates from move_group. """
388  self._mutex.acquire()
389  for obj in msg.world.collision_objects:
390  try:
391  if obj.operation == obj.ADD:
392  self._collision.append(obj.id)
393  rospy.logdebug('ObjectManager: Added Collision Obj "%s"',
394  obj.id)
395  if initial:
396  # this is our initial planning scene, hold onto each object
397  self._objects[obj.id] = obj
398  elif obj.operation == obj.REMOVE:
399  self._collision.remove(obj.id)
400  self._removed.pop(obj.id, None)
401  rospy.logdebug('ObjectManager: Removed Collision Obj "%s"',
402  obj.id)
403  except ValueError:
404  pass
405  self._attached = list()
406  for obj in msg.robot_state.attached_collision_objects:
407  rospy.logdebug('ObjectManager: attached collision Obj "%s"',
408  obj.object.id)
409  self._attached.append(obj.object.id)
410  if initial:
411  # this is our initial planning scene, hold onto each object
412  self._attached_objects[obj.object.id] = obj
413  self._mutex.release()
414 
415  ## @brief Get a list of names of collision objects
417  self._mutex.acquire()
418  l = copy.deepcopy(self._collision)
419  self._mutex.release()
420  return l
421 
422  ## @brief Get a list of names of attached objects
424  self._mutex.acquire()
425  l = copy.deepcopy(self._attached)
426  self._mutex.release()
427  return l
428 
429  ## @brief Wait for sync
430  def waitForSync(self, max_time=2.0):
431  sync = False
432  t = rospy.Time.now()
433  while not sync:
434  sync = True
435  # delete objects that should be gone
436  for name in self._collision:
437  if name in self._removed.keys():
438  # should be removed, is not
439  rospy.logwarn('ObjectManager: %s not removed yet', name)
440  self.removeCollisionObject(name, False)
441  sync = False
442  for name in self._attached:
443  if name in self._attached_removed.keys():
444  # should be removed, is not
445  rospy.logwarn('ObjectManager: Attached object name: %s not removed yet', name)
446  self.removeAttachedObject(name, False)
447  sync = False
448  # add missing objects
449  for name in self._objects.keys():
450  if name not in self._collision + self._attached:
451  rospy.logwarn('ObjectManager: %s not added yet', name)
452  self.sendUpdate(self._objects[name], None, False)
453  sync = False
454  for name in self._attached_objects.keys():
455  if name not in self._attached:
456  rospy.logwarn('ObjectManager: %s not attached yet', name)
457  self.sendUpdate(None, self._attached_objects[name], False)
458  sync = False
459  # timeout
460  if rospy.Time.now() - t > rospy.Duration(max_time):
461  rospy.logerr('ObjectManager: sync timed out.')
462  break
463  rospy.logdebug('ObjectManager: waiting for sync.')
464  rospy.sleep(0.1)
465 
466  ## @brief Set the color of an object
467  def setColor(self, name, r, g, b, a=0.9):
468  # Create our color
469  color = ObjectColor()
470  color.id = name
471  color.color.r = r
472  color.color.g = g
473  color.color.b = b
474  color.color.a = a
475  self._colors[name] = color
476 
477  ## @brief Actually send the colors to MoveIt!
478  def sendColors(self):
479  # Need to send a planning scene diff
480  p = PlanningScene()
481  p.is_diff = True
482  for color in self._colors.values():
483  p.object_colors.append(color)
484  resp = self._apply_service.call(p)
485  if not resp.success:
486  rospy.logerr("Could not update colors through service, using topic instead.")
487  self._scene_pub.publish(p)
def sendUpdate(self, collision_object, attached_collision_object, use_service=True)
Send new update to planning scene.
def makeSolidPrimitive(self, name, solid, pose, frame_id=None)
Make a solid primitive collision object.
def removeCollisionObject(self, name, use_service=True)
Send message to remove object.
def addCylinder(self, name, height, radius, x, y, z, use_service=True, frame_id=None)
Insert new cylinder into planning scene.
def sceneCb(self, msg, initial=False)
Update the object lists from a PlanningScene message.
def addSolidPrimitive(self, name, solid, pose, use_service=True, frame_id=None)
Insert a solid primitive into planning scene.
def getKnownCollisionObjects(self)
Get a list of names of collision objects.
def attachMesh(self, name, pose, filename, link_name, touch_links=None, detach_posture=None, weight=0.0, use_service=True)
Attach a mesh into the planning scene.
def addCube(self, name, size, x, y, z, use_service=True, frame_id=None)
Insert new cube to planning scene.
def clear(self)
Clear the planning scene of all objects.
def setColor(self, name, r, g, b, a=0.9)
Set the color of an object.
def addSphere(self, name, radius, x, y, z, use_service=True, frame_id=None)
Insert new sphere into planning scene.
def getKnownAttachedObjects(self)
Get a list of names of attached objects.
def makeMesh(self, name, pose, filename)
Make a mesh collision object.
def attachBox(self, name, size_x, size_y, size_z, x, y, z, link_name, touch_links=None, detach_posture=None, weight=0.0, use_service=True)
Attach a box into the planning scene.
def makeAttached(self, link_name, obj, touch_links, detach_posture, weight)
Make an attachedCollisionObject.
def addMesh(self, name, pose, filename, use_service=True)
Insert a mesh into the planning scene.
def addBox(self, name, size_x, size_y, size_z, x, y, z, use_service=True, frame_id=None)
Insert new box into planning scene.
def __init__(self, frame, ns='', init_from_service=True)
A class for managing the state of the planning scene.
def removeAttachedObject(self, name, use_service=True)
Send message to remove object.


moveit_python
Author(s): Michael Ferguson
autogenerated on Sat Feb 13 2021 03:22:05