7 from cStringIO
import StringIO
9 from io
import StringIO
17 from speech_recognition_msgs.msg
import Grammar
18 from speech_recognition_msgs.msg
import PhraseRule
19 from speech_recognition_msgs.msg
import Vocabulary
22 _REGEX_HIRAGANA = re.compile(
r'^(?:\xE3\x81[\x81-\xBF]|\xE3\x82[\x80-\x93])+$')
26 return _REGEX_HIRAGANA.search(s)
is not None 30 assert os.path.exists(path)
32 if sys.version_info.major >= 3:
34 return subprocess.check_output([
"nkf",
"-w", path]).split(sep)
38 assert os.path.isdir(path)
40 grammar_path = os.path.join(path,
"%s.grammar" % name)
42 if sys.version_info.major >= 3:
47 sym, defi = l.strip().split(sep)
49 r.symbol = sym.strip()
50 r.definition = [d.strip()
for d
in defi.split()]
52 voca_path = os.path.join(path,
"%s.voca" % name)
56 if sys.version_info.major >= 3:
62 g.categories.append(l[1:].strip())
64 g.vocabularies.append(voca)
67 sp = l.strip().split()
68 voca.words.append(sp[0])
69 voca.phonemes.append(spc.join(sp[1:]))
71 g.vocabularies.append(voca)
76 cmd = [
"rosrun",
"julius",
"yomi2voca.pl"]
77 stdin = os.linesep.join([
"%s %s" % (w, w)
for w
in words]) + os.linesep
78 rospy.logdebug(
"Executing %s" % cmd)
79 p = subprocess.Popen([
"rosrun",
"julius",
"yomi2voca.pl"],
80 stdin=subprocess.PIPE,
81 stdout=subprocess.PIPE,
82 stderr=subprocess.PIPE)
83 result, error = p.communicate(unicode(stdin,
'utf-8').encode(
'euc-jp'))
84 rospy.logdebug(
"STDOUT: %s" % result)
85 rospy.logdebug(
"STDERR: %s" % error)
87 if error
and "Error:" in error:
88 error = unicode(error,
'euc-jp').encode(
'utf-8')
89 rospy.logerr(
"Error: %s" % error)
92 result = unicode(result,
'euc-jp').encode(
'utf-8')
93 result = result.split(os.linesep)[:-1]
94 result = [r.split(
"\t")[1]
for r
in result]
103 definition = r.definition
104 if type(symbol) == bytes:
105 symbol = symbol.decode()
106 if len(definition) > 0:
107 if type(definition[0]) == bytes:
108 definition = b
' '.join(definition).decode()
110 definition =
' '.join(definition)
111 ss.write(
"{symbol}: {definition}{linesep}".format(
113 definition=definition,
120 for c, vs
in zip(cats, vocas):
123 ss.write(
"% {category}{linesep}".format(
126 phonemes = vs.phonemes
127 if len(phonemes) == 0:
129 for w, p
in zip(vs.words, phonemes):
134 ss.write(
"{word}\t{phoneme}{linesep}{linesep}".format(
143 temp_dir = tempfile.mkdtemp(prefix=
"mkdfa")
144 rospy.logdebug(
"created temp dir: %s" % temp_dir)
145 with open(os.path.join(temp_dir,
"{name}.grammar".format(name=name)),
"w")
as f:
147 with open(os.path.join(temp_dir,
"{name}.voca".format(name=name)),
"w")
as f:
150 cmd = [
"rosrun",
"julius",
"mkdfa.pl", name]
151 rospy.logdebug(
"Executing %s" % cmd)
152 if sys.version_info.major >= 3:
153 p = subprocess.Popen(cmd,
154 stdin=subprocess.PIPE,
155 stdout=subprocess.PIPE,
156 stderr=subprocess.PIPE,
160 p = subprocess.Popen(cmd,
161 stdin=subprocess.PIPE,
162 stdout=subprocess.PIPE,
163 stderr=subprocess.PIPE,
165 result, error = p.communicate(temp_dir)
166 rospy.logdebug(
"STDOUT: %s" % result)
167 rospy.logdebug(
"STDERR: %s" % error)
169 if "generated:" not in result:
170 rospy.logerr(
"Failed to compile grammar to DFA: %s" % error.strip())
173 with open(os.path.join(temp_dir,
"{name}.dfa".format(name=name)),
"r") as f: 175 with open(os.path.join(temp_dir, "{name}.dict".format(name=name)),
"r") as f: 180 if __name__ ==
'__main__':
182 assert result[0] ==
"u d o N" 183 assert result[1] ==
"s o b a" def readlines_with_utf8(path)
def make_phonemes_from_words(words)
def load_grammar(path, name)
def make_dfa(grammar, voca)
def make_grammar_from_rules(rules)
def make_voca_from_categories(cats, vocas)