bvh_broadcaster.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import tf
3 import math
4 import numpy
5 import rospy
6 import string
7 import argparse
8 
9 # Node
10 class Node:
11  def __init__(self, root=False):
12  self.name = None
13  self.channels = []
14  self.offset = (0,0,0)
15  self.children = []
16  self._is_root = root
17 
18  def isRoot(self):
19  return self._is_root
20 
21  def isEndSite(self):
22  return len(self.children)==0
23 
24 
25 # BVHReader
26 class BVHReader:
27  def __init__(self, filename):
28 
29  self.filename = filename
30  # A list of unprocessed tokens (strings)
31  self.tokenlist = []
32  # The current line number
33  self.linenr = 0
34 
35  # Root node
36  self._root = None
37  self._nodestack = []
38 
39  # Total number of channels
40  self._numchannels = 0
41 
42  def onHierarchy(self, root):
43  pass
44 
45  def onMotion(self, frames, dt):
46  pass
47 
48  def onFrame(self, values):
49  pass
50 
51  # read
52  def read(self):
53  """Read the entire file.
54  """
55  self.fhandle = file(self.filename)
56 
57  self.readHierarchy()
58  self.onHierarchy(self._root)
59  self.readMotion()
60 
61  # readMotion
62  def readMotion(self):
63  """Read the motion samples.
64  """
65  # No more tokens (i.e. end of file)? Then just return
66  try:
67  tok = self.token()
68  except StopIteration:
69  return
70 
71  if tok!="MOTION":
72  raise SyntaxError("Syntax error in line %d: 'MOTION' expected, got '%s' instead"%(self.linenr, tok))
73 
74  # Read the number of frames
75  tok = self.token()
76  if tok!="Frames:":
77  raise SyntaxError("Syntax error in line %d: 'Frames:' expected, got '%s' instead"%(self.linenr, tok))
78 
79  frames = self.intToken()
80 
81  # Read the frame time
82  tok = self.token()
83  if tok!="Frame":
84  raise SyntaxError("Syntax error in line %d: 'Frame Time:' expected, got '%s' instead"%(self.linenr, tok))
85  tok = self.token()
86  if tok!="Time:":
87  raise SyntaxError("Syntax error in line %d: 'Frame Time:' expected, got 'Frame %s' instead"%(self.linenr, tok))
88 
89  dt = self.floatToken()
90 
91  self.onMotion(frames, dt)
92 
93  # Read the channel values
94  for i in range(frames):
95  s = self.readLine()
96  a = s.split()
97  if len(a)!=self._numchannels:
98  raise SyntaxError("Syntax error in line %d: %d float values expected, got %d instead"%(self.linenr, self._numchannels, len(a)))
99  values = map(lambda x: float(x), a)
100  self.onFrame(values)
101 
102 
103  # readHierarchy
104  def readHierarchy(self):
105  """Read the skeleton hierarchy.
106  """
107  tok = self.token()
108  if tok!="HIERARCHY":
109  raise SyntaxError("Syntax error in line %d: 'HIERARCHY' expected, got '%s' instead"%(self.linenr, tok))
110 
111  tok = self.token()
112  if tok!="ROOT":
113  raise SyntaxError("Syntax error in line %d: 'ROOT' expected, got '%s' instead"%(self.linenr, tok))
114 
115  self._root = Node(root=True)
116  self._nodestack.append(self._root)
117  self.readNode()
118 
119  # readNode
120  def readNode(self):
121  """Read the data for a node.
122  """
123 
124  # Read the node name (or the word 'Site' if it was a 'End Site'
125  # node)
126  name = self.token()
127  self._nodestack[-1].name = name
128 
129  tok = self.token()
130  if tok!="{":
131  raise SyntaxError("Syntax error in line %d: '{' expected, got '%s' instead"%(self.linenr, tok))
132 
133  while 1:
134  tok = self.token()
135  if tok=="OFFSET":
136  x = self.floatToken()
137  y = self.floatToken()
138  z = self.floatToken()
139  self._nodestack[-1].offset = (x,y,z)
140  elif tok=="CHANNELS":
141  n = self.intToken()
142  channels = []
143  for i in range(n):
144  tok = self.token()
145  if tok not in ["Xposition", "Yposition", "Zposition",
146  "Xrotation", "Yrotation", "Zrotation"]:
147  raise SyntaxError("Syntax error in line %d: Invalid channel name: '%s'"%(self.linenr, tok))
148  channels.append(tok)
149  self._numchannels += len(channels)
150  self._nodestack[-1].channels = channels
151  elif tok=="JOINT":
152  node = Node()
153  self._nodestack[-1].children.append(node)
154  self._nodestack.append(node)
155  self.readNode()
156  elif tok=="End":
157  node = Node()
158  self._nodestack[-1].children.append(node)
159  self._nodestack.append(node)
160  self.readNode()
161  elif tok=="}":
162  if self._nodestack[-1].isEndSite():
163  self._nodestack[-1].name = "End Site"
164  self._nodestack.pop()
165  break
166  else:
167  raise SyntaxError("Syntax error in line %d: Unknown keyword '%s'"%(self.linenr, tok))
168 
169 
170  # intToken
171  def intToken(self):
172  """Return the next token which must be an int.
173  """
174 
175  tok = self.token()
176  try:
177  return int(tok)
178  except ValueError:
179  raise SyntaxError("Syntax error in line %d: Integer expected, got '%s' instead"%(self.linenr, tok))
180 
181  # floatToken
182  def floatToken(self):
183  """Return the next token which must be a float.
184  """
185 
186  tok = self.token()
187  try:
188  return float(tok)
189  except ValueError:
190  raise SyntaxError("Syntax error in line %d: Float expected, got '%s' instead"%(self.linenr, tok))
191 
192  # token
193  def token(self):
194  """Return the next token."""
195 
196  # Are there still some tokens left? then just return the next one
197  if self.tokenlist!=[]:
198  tok = self.tokenlist[0]
199  self.tokenlist = self.tokenlist[1:]
200  return tok
201 
202  # Read a new line
203  s = self.readLine()
204  self.createTokens(s)
205  return self.token()
206 
207  # readLine
208  def readLine(self):
209  """Return the next line.
210 
211  Empty lines are skipped. If the end of the file has been
212  reached, a StopIteration exception is thrown. The return
213  value is the next line containing data (this will never be an
214  empty string).
215  """
216  # Discard any remaining tokens
217  self.tokenlist = []
218 
219  # Read the next line
220  while 1:
221  s = self.fhandle.readline()
222  self.linenr += 1
223  if s=="":
224  raise StopIteration
225  return s
226 
227  # createTokens
228  def createTokens(self, s):
229  """Populate the token list from the content of s.
230  """
231  s = s.strip()
232  a = s.split()
233  self.tokenlist = a
234 
236  def __init__(self, filename, root_frame):
237  BVHReader.__init__(self, filename)
238  self.br = tf.TransformBroadcaster()
239  self.all_motions = []
240  self.dt = 1
241  self.num_motions = 1
242  self.root_frame = root_frame
243 
244  self.counter = 0
245  self.this_motion = None
246 
247  self.scaling_factor = 0.1
248 
249  def onHierarchy(self, root):
250  self.scaling_factor = 0.1/root.children[0].children[0].offset[0]
251 
252  def onMotion(self, frames, dt):
253  self.dt = dt
254  self.num_motions = frames
255 
256  def onFrame(self, values):
257  self.all_motions.append(values)
258 
259  def broadcastRootJoint(self, root, parent_frame):
260  if root.isEndSite():
261  return
262 
263  num_channels = len(root.channels)
264 
265  flag_trans = 0
266  flag_rot = 0
267 
268  mat_rot = numpy.array([ [1.,0.,0.,0.],
269  [0.,1.,0.,0.],
270  [0.,0.,1.,0.],
271  [0.,0.,0.,1.] ])
272 
273  for channel in root.channels:
274  keyval = self.this_motion[self.counter]
275  if(channel == "Xposition"):
276  flag_trans = True
277  x = keyval
278  elif(channel == "Yposition"):
279  flag_trans = True
280  y = keyval
281  elif(channel == "Zposition"):
282  flag_trans = True
283  z = keyval
284  elif(channel == "Xrotation"):
285  flag_rot = True
286  xrot = keyval
287  theta = math.radians(xrot)
288  c = math.cos(theta)
289  s = math.sin(theta)
290  mat_x_rot = numpy.array([ [1.,0.,0.,0.],
291  [0., c,-s,0.],
292  [0., s, c,0.],
293  [0.,0.,0.,1.] ])
294  mat_rot = numpy.matmul(mat_rot, mat_x_rot)
295 
296  elif(channel == "Yrotation"):
297  flag_rot = True
298  yrot = keyval
299  theta = math.radians(yrot)
300  c = math.cos(theta)
301  s = math.sin(theta)
302  mat_y_rot = numpy.array([ [ c,0., s,0.],
303  [0.,1.,0.,0.],
304  [-s,0., c,0.],
305  [0.,0.,0.,1.] ])
306  mat_rot = numpy.matmul(mat_rot, mat_y_rot)
307 
308  elif(channel == "Zrotation"):
309  flag_rot = True
310  zrot = keyval
311  theta = math.radians(zrot)
312  c = math.cos(theta)
313  s = math.sin(theta)
314  mat_z_rot = numpy.array([ [ c,-s,0.,0.],
315  [ s, c,0.,0.],
316  [0.,0.,1.,0.],
317  [0.,0.,0.,1.] ])
318  mat_rot = numpy.matmul(mat_rot, mat_z_rot)
319  else:
320  return
321  self.counter += 1
322 
323  if flag_trans:
324  temp_trans = (self.scaling_factor * (x + root.offset[0]),
325  self.scaling_factor * (y + root.offset[1]),
326  self.scaling_factor * (z + root.offset[2]))
327  else:
328  temp_trans = (self.scaling_factor * (root.offset[0]),
329  self.scaling_factor * (root.offset[1]),
330  self.scaling_factor * (root.offset[2]))
331 
332  temp_rot = tf.transformations.quaternion_from_matrix(mat_rot)
333 
334  self.br.sendTransform(temp_trans, temp_rot, rospy.Time.now(), root.name, parent_frame)
335 
336  for each_child in root.children:
337  self.broadcastRootJoint(each_child, root.name)
338 
339  def broadcast(self, loop=False):
340  self.read()
341  rate = rospy.Rate(1/self.dt)
342 
343  while not rospy.is_shutdown():
344  for ind in range(self.num_motions):
345  self.counter = 0
346  self.this_motion = self.all_motions[ind]
347 
348  self.broadcastRootJoint(self._root, self.root_frame)
349  if rospy.is_shutdown():
350  break
351  rate.sleep()
352  if not loop:
353  break
354 
356  parser = argparse.ArgumentParser("python BVHBroadcaster.py")
357  parser.add_argument('bvh_file', help="A path to bvh file that you want to broadcast")
358  parser.add_argument('base_frame', help="An existing frame in rviz on which the skeleton will be loaded")
359  parser.add_argument('-n', '--name', help="Node name, default: BVHBroadcaster", default="BVHBroadcaster")
360  parser.add_argument('-l', '--loop', help="Loop broadcasting", action="store_true")
361  return parser.parse_args()
362 
363 def main(args):
364  rospy.init_node(args.name)
365  # file_name = "/home/mingfei/Documents/RobotManipulationProject/mocap/62/62_07.bvh"
366  bvh_test = BVHBroadcaster(args.bvh_file, args.base_frame)
367  rospy.loginfo("Broadcasting bvh file (%s) on frame %s"%(args.bvh_file, args.base_frame))
368  if args.loop:
369  rospy.loginfo("Loop")
370  else:
371  rospy.loginfo("Only once")
372  bvh_test.broadcast(loop=args.loop)
373  rospy.loginfo("Broadcasting done")
374 
375 def test():
376  rospy.init_node("BVHBroadcaster")
377  file_name = "/home/mingfei/Documents/projects/RobotManipulationProject/mocap/62/62_07.bvh"
378  bvh_test = BVHBroadcaster(file_name, "world")
379  bvh_test.broadcast(loop=True)
380 
381 if __name__ == "__main__":
382  args = argsparser()
383  main(args)
def broadcast(self, loop=False)
def __init__(self, filename, root_frame)
def onMotion(self, frames, dt)
def onMotion(self, frames, dt)
def onHierarchy(self, root)
def __init__(self, filename)
def onFrame(self, values)
def __init__(self, root=False)
def broadcastRootJoint(self, root, parent_frame)


bvh_broadcaster
Author(s): Mingfei Sun
autogenerated on Fri Apr 9 2021 02:09:54