utils.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # Copyright: Yuki Furuta <furushchev@jsk.imi.i.u-tokyo.ac.jp>
4 
5 # https://stackoverflow.com/questions/11914472/stringio-in-python3
6 try:
7  from cStringIO import StringIO ## for Python 2
8 except ImportError:
9  from io import StringIO ## for Python 3
10 import os
11 import re
12 import sys
13 import rospkg
14 import rospy
15 import subprocess
16 import tempfile
17 from speech_recognition_msgs.msg import Grammar
18 from speech_recognition_msgs.msg import PhraseRule
19 from speech_recognition_msgs.msg import Vocabulary
20 
21 
22 _REGEX_HIRAGANA = re.compile(r'^(?:\xE3\x81[\x81-\xBF]|\xE3\x82[\x80-\x93])+$')
23 
24 
25 def is_hiragana(s):
26  return _REGEX_HIRAGANA.search(s) is not None
27 
28 
30  assert os.path.exists(path)
31  sep = os.linesep
32  if sys.version_info.major >= 3:
33  sep = sep.encode()
34  return subprocess.check_output(["nkf", "-w", path]).split(sep)
35 
36 
37 def load_grammar(path, name):
38  assert os.path.isdir(path)
39  g = Grammar()
40  grammar_path = os.path.join(path, "%s.grammar" % name)
41  sep = ':'
42  if sys.version_info.major >= 3:
43  sep = sep.encode()
44  for l in readlines_with_utf8(grammar_path):
45  l = l.strip()
46  if l:
47  sym, defi = l.strip().split(sep)
48  r = PhraseRule()
49  r.symbol = sym.strip()
50  r.definition = [d.strip() for d in defi.split()]
51  g.rules.append(r)
52  voca_path = os.path.join(path, "%s.voca" % name)
53  voca = None
54  sep = '%'
55  spc = ' '
56  if sys.version_info.major >= 3:
57  sep = sep.encode()
58  spc = spc.encode()
59  for l in readlines_with_utf8(voca_path):
60  l = l.strip()
61  if l.startswith(sep):
62  g.categories.append(l[1:].strip())
63  if voca:
64  g.vocabularies.append(voca)
65  voca = Vocabulary()
66  elif l:
67  sp = l.strip().split()
68  voca.words.append(sp[0])
69  voca.phonemes.append(spc.join(sp[1:]))
70  if voca:
71  g.vocabularies.append(voca)
72  return g
73 
74 
76  cmd = ["rosrun", "julius", "yomi2voca.pl"]
77  stdin = os.linesep.join(["%s %s" % (w, w) for w in words]) + os.linesep
78  rospy.logdebug("Executing %s" % cmd)
79  p = subprocess.Popen(["rosrun", "julius", "yomi2voca.pl"],
80  stdin=subprocess.PIPE,
81  stdout=subprocess.PIPE,
82  stderr=subprocess.PIPE)
83  result, error = p.communicate(unicode(stdin, 'utf-8').encode('euc-jp'))
84  rospy.logdebug("STDOUT: %s" % result)
85  rospy.logdebug("STDERR: %s" % error)
86 
87  if error and "Error:" in error:
88  error = unicode(error, 'euc-jp').encode('utf-8')
89  rospy.logerr("Error: %s" % error)
90  return None
91 
92  result = unicode(result, 'euc-jp').encode('utf-8')
93  result = result.split(os.linesep)[:-1]
94  result = [r.split("\t")[1] for r in result]
95 
96  return result
97 
98 
100  ss = StringIO()
101  for r in rules:
102  symbol = r.symbol
103  definition = r.definition
104  if type(symbol) == bytes:
105  symbol = symbol.decode()
106  if len(definition) > 0:
107  if type(definition[0]) == bytes:
108  definition = b' '.join(definition).decode()
109  else:
110  definition = ' '.join(definition)
111  ss.write("{symbol}: {definition}{linesep}".format(
112  symbol=symbol,
113  definition=definition,
114  linesep=os.linesep))
115  return ss.getvalue()
116 
117 
118 def make_voca_from_categories(cats, vocas):
119  ss = StringIO()
120  for c, vs in zip(cats, vocas):
121  if type(c) == bytes:
122  c = c.decode()
123  ss.write("% {category}{linesep}".format(
124  category=c,
125  linesep=os.linesep))
126  phonemes = vs.phonemes
127  if len(phonemes) == 0:
128  phonemes = make_phonemes_from_words(vs.words)
129  for w, p in zip(vs.words, phonemes):
130  if type(w) == bytes:
131  w = w.decode()
132  if type(p) == bytes:
133  p = p.decode()
134  ss.write("{word}\t{phoneme}{linesep}{linesep}".format(
135  word=w,
136  phoneme=p,
137  linesep=os.linesep))
138  return ss.getvalue()
139 
140 
141 def make_dfa(grammar, voca):
142  name = "data"
143  temp_dir = tempfile.mkdtemp(prefix="mkdfa")
144  rospy.logdebug("created temp dir: %s" % temp_dir)
145  with open(os.path.join(temp_dir, "{name}.grammar".format(name=name)), "w") as f:
146  f.write(grammar)
147  with open(os.path.join(temp_dir, "{name}.voca".format(name=name)), "w") as f:
148  f.write(voca)
149 
150  cmd = ["rosrun", "julius", "mkdfa.pl", name]
151  rospy.logdebug("Executing %s" % cmd)
152  if sys.version_info.major >= 3:
153  p = subprocess.Popen(cmd,
154  stdin=subprocess.PIPE,
155  stdout=subprocess.PIPE,
156  stderr=subprocess.PIPE,
157  encoding='utf8',
158  cwd=temp_dir)
159  else:
160  p = subprocess.Popen(cmd,
161  stdin=subprocess.PIPE,
162  stdout=subprocess.PIPE,
163  stderr=subprocess.PIPE,
164  cwd=temp_dir)
165  result, error = p.communicate(temp_dir)
166  rospy.logdebug("STDOUT: %s" % result)
167  rospy.logdebug("STDERR: %s" % error)
168 
169  if "generated:" not in result:
170  rospy.logerr("Failed to compile grammar to DFA: %s" % error.strip())
171  return None
172 
173  with open(os.path.join(temp_dir, "{name}.dfa".format(name=name)), "r") as f:
174  dfa = f.read()
175  with open(os.path.join(temp_dir, "{name}.dict".format(name=name)), "r") as f:
176  dic = f.read()
177  return dfa, dic
178 
179 
180 if __name__ == '__main__':
181  result = make_phonemes_from_words(["うどん", "そば"])
182  assert result[0] == "u d o N"
183  assert result[1] == "s o b a"
def readlines_with_utf8(path)
Definition: utils.py:29
def make_phonemes_from_words(words)
Definition: utils.py:75
def load_grammar(path, name)
Definition: utils.py:37
def make_dfa(grammar, voca)
Definition: utils.py:141
def is_hiragana(s)
Definition: utils.py:25
def make_grammar_from_rules(rules)
Definition: utils.py:99
def make_voca_from_categories(cats, vocas)
Definition: utils.py:118


julius_ros
Author(s): Yuki Furuta
autogenerated on Tue May 11 2021 02:55:36