__init__.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2010, Willow Garage, Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 #
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following
14 # disclaimer in the documentation and/or other materials provided
15 # with the distribution.
16 # * Neither the name of Willow Garage, Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived
18 # from this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 # POSSIBILITY OF SUCH DAMAGE.
32 
33 import math
34 import sys
35 import xml.dom.minidom
36 
37 import rospy
38 import sensor_msgs.msg
39 
40 
41 def get_param(name, value=None):
42  private = "~%s" % name
43  if rospy.has_param(private):
44  return rospy.get_param(private)
45  elif rospy.has_param(name):
46  return rospy.get_param(name)
47  else:
48  return value
49 
50 
52  def init_collada(self, robot):
53  robot = robot.getElementsByTagName('kinematics_model')[0].getElementsByTagName('technique_common')[0]
54  for child in robot.childNodes:
55  if child.nodeType is child.TEXT_NODE:
56  continue
57  if child.localName == 'joint':
58  name = child.getAttribute('name')
59  if child.getElementsByTagName('revolute'):
60  joint = child.getElementsByTagName('revolute')[0]
61  else:
62  rospy.logwarn("Unknown joint type %s", child)
63  continue
64 
65  if joint:
66  limit = joint.getElementsByTagName('limits')[0]
67  minval = float(limit.getElementsByTagName('min')[0].childNodes[0].nodeValue)
68  maxval = float(limit.getElementsByTagName('max')[0].childNodes[0].nodeValue)
69  if minval == maxval: # this is fixed joint
70  continue
71 
72  self.joint_list.append(name)
73  joint = {'min':minval*math.pi/180.0, 'max':maxval*math.pi/180.0, 'zero':0, 'position':0, 'velocity':0, 'effort':0}
74  self.free_joints[name] = joint
75 
76  def init_urdf(self, robot):
77  robot = robot.getElementsByTagName('robot')[0]
78  # Find all non-fixed joints
79  for child in robot.childNodes:
80  if child.nodeType is child.TEXT_NODE:
81  continue
82  if child.localName == 'joint':
83  jtype = child.getAttribute('type')
84  if jtype in ['fixed', 'floating', 'planar']:
85  continue
86  name = child.getAttribute('name')
87  self.joint_list.append(name)
88  if jtype == 'continuous':
89  minval = -math.pi
90  maxval = math.pi
91  else:
92  try:
93  limit = child.getElementsByTagName('limit')[0]
94  minval = float(limit.getAttribute('lower'))
95  maxval = float(limit.getAttribute('upper'))
96  except:
97  rospy.logwarn("%s is not fixed, nor continuous, but limits are not specified!" % name)
98  continue
99 
100  safety_tags = child.getElementsByTagName('safety_controller')
101  if self.use_small and len(safety_tags) == 1:
102  tag = safety_tags[0]
103  if tag.hasAttribute('soft_lower_limit'):
104  minval = max(minval, float(tag.getAttribute('soft_lower_limit')))
105  if tag.hasAttribute('soft_upper_limit'):
106  maxval = min(maxval, float(tag.getAttribute('soft_upper_limit')))
107 
108  mimic_tags = child.getElementsByTagName('mimic')
109  if self.use_mimic and len(mimic_tags) == 1:
110  tag = mimic_tags[0]
111  entry = {'parent': tag.getAttribute('joint')}
112  if tag.hasAttribute('multiplier'):
113  entry['factor'] = float(tag.getAttribute('multiplier'))
114  if tag.hasAttribute('offset'):
115  entry['offset'] = float(tag.getAttribute('offset'))
116 
117  self.dependent_joints[name] = entry
118  continue
119 
120  if name in self.dependent_joints:
121  continue
122 
123  zeroval = get_param("zeros/" + name)
124  if not zeroval:
125  if minval > 0 or maxval < 0:
126  zeroval = (maxval + minval)/2
127  else:
128  zeroval = 0
129 
130  joint = {'min': minval, 'max': maxval, 'zero': zeroval}
131  if self.pub_def_positions:
132  joint['position'] = zeroval
133  if self.pub_def_vels:
134  joint['velocity'] = 0.0
135  if self.pub_def_efforts:
136  joint['effort'] = 0.0
137 
138  if jtype == 'continuous':
139  joint['continuous'] = True
140  self.free_joints[name] = joint
141 
142  def __init__(self):
143  description = get_param('robot_description')
144  if description is None:
145  raise RuntimeError('The robot_description parameter is required and not set.')
146 
147  self.free_joints = {}
148  self.joint_list = [] # for maintaining the original order of the joints
149  self.dependent_joints = get_param("dependent_joints", {})
150  self.use_mimic = get_param('use_mimic_tags', True)
151  self.use_small = get_param('use_smallest_joint_limits', True)
152 
153 
154  self.pub_def_positions = get_param("publish_default_positions", True)
155  self.pub_def_vels = get_param("publish_default_velocities", False)
156  self.pub_def_efforts = get_param("publish_default_efforts", False)
157 
158  robot = xml.dom.minidom.parseString(description)
159  if robot.getElementsByTagName('COLLADA'):
160  self.init_collada(robot)
161  else:
162  self.init_urdf(robot)
163 
164  # The source_update_cb will be called at the end of self.source_cb.
165  # The main purpose it to allow external observes (such as the
166  # joint_state_publisher_gui) to be notified when things are updated.
167  self.source_update_cb = None
168 
169  source_list = get_param("source_list", [])
170  self.sources = []
171  for source in source_list:
172  self.sources.append(rospy.Subscriber(source, sensor_msgs.msg.JointState, self.source_cb))
173 
174  self.pub = rospy.Publisher('joint_states', sensor_msgs.msg.JointState, queue_size=5)
175 
176  def source_cb(self, msg):
177  for i in range(len(msg.name)):
178  name = msg.name[i]
179  if name not in self.free_joints:
180  continue
181 
182  if msg.position:
183  position = msg.position[i]
184  else:
185  position = None
186  if msg.velocity:
187  velocity = msg.velocity[i]
188  else:
189  velocity = None
190  if msg.effort:
191  effort = msg.effort[i]
192  else:
193  effort = None
194 
195  joint = self.free_joints[name]
196  if position is not None:
197  joint['position'] = position
198  if velocity is not None:
199  joint['velocity'] = velocity
200  if effort is not None:
201  joint['effort'] = effort
202 
203  if self.source_update_cb is not None:
204  self.source_update_cb()
205 
206  def set_source_update_cb(self, user_cb):
207  self.source_update_cb = user_cb
208 
209  def loop(self):
210  hz = get_param("rate", 10) # 10hz
211  r = rospy.Rate(hz)
212 
213  delta = get_param("delta", 0.0)
214 
215  # Publish Joint States
216  while not rospy.is_shutdown():
217  msg = sensor_msgs.msg.JointState()
218  msg.header.stamp = rospy.Time.now()
219 
220  if delta > 0:
221  self.update(delta)
222 
223  # Initialize msg.position, msg.velocity, and msg.effort.
224  has_position = len(self.dependent_joints.items()) > 0
225  has_velocity = False
226  has_effort = False
227  for name, joint in self.free_joints.items():
228  if not has_position and 'position' in joint:
229  has_position = True
230  if not has_velocity and 'velocity' in joint:
231  has_velocity = True
232  if not has_effort and 'effort' in joint:
233  has_effort = True
234  num_joints = (len(self.free_joints.items()) +
235  len(self.dependent_joints.items()))
236  if has_position:
237  msg.position = num_joints * [0.0]
238  if has_velocity:
239  msg.velocity = num_joints * [0.0]
240  if has_effort:
241  msg.effort = num_joints * [0.0]
242 
243  for i, name in enumerate(self.joint_list):
244  msg.name.append(str(name))
245  joint = None
246 
247  # Add Free Joint
248  if name in self.free_joints:
249  joint = self.free_joints[name]
250  factor = 1
251  offset = 0
252  # Add Dependent Joint
253  elif name in self.dependent_joints:
254  param = self.dependent_joints[name]
255  parent = param['parent']
256  factor = param.get('factor', 1)
257  offset = param.get('offset', 0)
258  # Handle recursive mimic chain
259  recursive_mimic_chain_joints = [name]
260  while parent in self.dependent_joints:
261  if parent in recursive_mimic_chain_joints:
262  error_message = "Found an infinite recursive mimic chain"
263  rospy.logerr("%s: [%s, %s]", error_message, ', '.join(recursive_mimic_chain_joints), parent)
264  sys.exit(1)
265  recursive_mimic_chain_joints.append(parent)
266  param = self.dependent_joints[parent]
267  parent = param['parent']
268  offset += factor * param.get('offset', 0)
269  factor *= param.get('factor', 1)
270  joint = self.free_joints[parent]
271 
272  if has_position and 'position' in joint:
273  msg.position[i] = joint['position'] * factor + offset
274  if has_velocity and 'velocity' in joint:
275  msg.velocity[i] = joint['velocity'] * factor
276  if has_effort and 'effort' in joint:
277  msg.effort[i] = joint['effort']
278 
279  if msg.name or msg.position or msg.velocity or msg.effort:
280  # Only publish non-empty messages
281  self.pub.publish(msg)
282  try:
283  r.sleep()
284  except rospy.exceptions.ROSTimeMovedBackwardsException:
285  pass
286 
287  def update(self, delta):
288  for name, joint in self.free_joints.iteritems():
289  forward = joint.get('forward', True)
290  if forward:
291  joint['position'] += delta
292  if joint['position'] > joint['max']:
293  if joint.get('continuous', False):
294  joint['position'] = joint['min']
295  else:
296  joint['position'] = joint['max']
297  joint['forward'] = not forward
298  else:
299  joint['position'] -= delta
300  if joint['position'] < joint['min']:
301  joint['position'] = joint['min']
302  joint['forward'] = not forward
joint_state_publisher.get_param
def get_param(name, value=None)
Definition: __init__.py:41
joint_state_publisher.JointStatePublisher.update
def update(self, delta)
Definition: __init__.py:287
joint_state_publisher.JointStatePublisher.pub
pub
Definition: __init__.py:174
joint_state_publisher.JointStatePublisher.source_cb
def source_cb(self, msg)
Definition: __init__.py:176
joint_state_publisher.JointStatePublisher.use_small
use_small
Definition: __init__.py:151
joint_state_publisher.JointStatePublisher.set_source_update_cb
def set_source_update_cb(self, user_cb)
Definition: __init__.py:206
joint_state_publisher.JointStatePublisher.__init__
def __init__(self)
Definition: __init__.py:142
joint_state_publisher.JointStatePublisher.init_urdf
def init_urdf(self, robot)
Definition: __init__.py:76
joint_state_publisher.JointStatePublisher.pub_def_efforts
pub_def_efforts
Definition: __init__.py:156
joint_state_publisher.JointStatePublisher.pub_def_vels
pub_def_vels
Definition: __init__.py:155
joint_state_publisher.JointStatePublisher.joint_list
joint_list
Definition: __init__.py:148
joint_state_publisher.JointStatePublisher.init_collada
def init_collada(self, robot)
Definition: __init__.py:52
joint_state_publisher.JointStatePublisher.sources
sources
Definition: __init__.py:170
joint_state_publisher.JointStatePublisher.dependent_joints
dependent_joints
Definition: __init__.py:149
joint_state_publisher.JointStatePublisher.free_joints
free_joints
Definition: __init__.py:147
joint_state_publisher.JointStatePublisher.source_update_cb
source_update_cb
Definition: __init__.py:167
joint_state_publisher.JointStatePublisher.pub_def_positions
pub_def_positions
Definition: __init__.py:154
joint_state_publisher.JointStatePublisher
Definition: __init__.py:51
joint_state_publisher.JointStatePublisher.use_mimic
use_mimic
Definition: __init__.py:150
joint_state_publisher.JointStatePublisher.loop
def loop(self)
Definition: __init__.py:209


joint_state_publisher
Author(s): David V. Lu!! , Jackie Kay
autogenerated on Sat Apr 12 2025 02:38:33