00001
00002
00003
00004
00005 from cStringIO import StringIO
00006 import os
00007 import re
00008 import rospkg
00009 import rospy
00010 import subprocess
00011 import tempfile
00012 from speech_recognition_msgs.msg import Grammar
00013 from speech_recognition_msgs.msg import PhraseRule
00014 from speech_recognition_msgs.msg import Vocabulary
00015
00016
00017 _REGEX_HIRAGANA = re.compile(r'^(?:\xE3\x81[\x81-\xBF]|\xE3\x82[\x80-\x93])+$')
00018
00019
00020 def is_hiragana(s):
00021 return _REGEX_HIRAGANA.search(s) is not None
00022
00023
00024 def readlines_with_utf8(path):
00025 assert os.path.exists(path)
00026 return subprocess.check_output(["nkf", "-w", path]).split(os.linesep)
00027
00028
00029 def load_grammar(path, name):
00030 assert os.path.isdir(path)
00031 g = Grammar()
00032 grammar_path = os.path.join(path, "%s.grammar" % name)
00033 for l in readlines_with_utf8(grammar_path):
00034 l = l.strip()
00035 if l:
00036 sym, defi = l.strip().split(':')
00037 r = PhraseRule()
00038 r.symbol = sym.strip()
00039 r.definition = [d.strip() for d in defi.split()]
00040 g.rules.append(r)
00041 voca_path = os.path.join(path, "%s.voca" % name)
00042 voca = None
00043 for l in readlines_with_utf8(voca_path):
00044 l = l.strip()
00045 if l.startswith('%'):
00046 g.categories.append(l[1:].strip())
00047 if voca:
00048 g.vocabularies.append(voca)
00049 voca = Vocabulary()
00050 elif l:
00051 sp = l.strip().split()
00052 voca.words.append(sp[0])
00053 voca.phonemes.append(' '.join(sp[1:]))
00054 if voca:
00055 g.vocabularies.append(voca)
00056 return g
00057
00058
00059 def make_phonemes_from_words(words):
00060 cmd = ["rosrun", "julius", "yomi2voca.pl"]
00061 stdin = os.linesep.join(["%s %s" % (w, w) for w in words]) + os.linesep
00062 rospy.logdebug("Executing %s" % cmd)
00063 p = subprocess.Popen(["rosrun", "julius", "yomi2voca.pl"],
00064 stdin=subprocess.PIPE,
00065 stdout=subprocess.PIPE,
00066 stderr=subprocess.PIPE)
00067 result, error = p.communicate(unicode(stdin, 'utf-8').encode('euc-jp'))
00068 rospy.logdebug("STDOUT: %s" % result)
00069 rospy.logdebug("STDERR: %s" % error)
00070
00071 if error and "Error:" in error:
00072 error = unicode(error, 'euc-jp').encode('utf-8')
00073 rospy.logerr("Error: %s" % error)
00074 return None
00075
00076 result = unicode(result, 'euc-jp').encode('utf-8')
00077 result = result.split(os.linesep)[:-1]
00078 result = [r.split("\t")[1] for r in result]
00079
00080 return result
00081
00082
00083 def make_grammar_from_rules(rules):
00084 ss = StringIO()
00085 for r in rules:
00086 ss.write("{symbol}: {definition}{linesep}".format(
00087 symbol=r.symbol,
00088 definition=" ".join(r.definition),
00089 linesep=os.linesep))
00090 return ss.getvalue()
00091
00092
00093 def make_voca_from_categories(cats, vocas):
00094 ss = StringIO()
00095 for c, vs in zip(cats, vocas):
00096 ss.write("% {category}{linesep}".format(
00097 category=c,
00098 linesep=os.linesep))
00099 phonemes = vs.phonemes
00100 if len(phonemes) == 0:
00101 phonemes = make_phonemes_from_words(vs.words)
00102 for w, p in zip(vs.words, phonemes):
00103 ss.write("{word}\t{phoneme}{linesep}{linesep}".format(
00104 word=w,
00105 phoneme=p,
00106 linesep=os.linesep))
00107 return ss.getvalue()
00108
00109
00110 def make_dfa(grammar, voca):
00111 name = "data"
00112 temp_dir = tempfile.mkdtemp(prefix="mkdfa")
00113 rospy.logdebug("created temp dir: %s" % temp_dir)
00114 with open(os.path.join(temp_dir, "{name}.grammar".format(name=name)), "w") as f:
00115 f.write(grammar)
00116 with open(os.path.join(temp_dir, "{name}.voca".format(name=name)), "w") as f:
00117 f.write(voca)
00118
00119 cmd = ["rosrun", "julius", "mkdfa.pl", name]
00120 rospy.logdebug("Executing %s" % cmd)
00121 p = subprocess.Popen(cmd,
00122 stdin=subprocess.PIPE,
00123 stdout=subprocess.PIPE,
00124 stderr=subprocess.PIPE,
00125 cwd=temp_dir)
00126 result, error = p.communicate(temp_dir)
00127 rospy.logdebug("STDOUT: %s" % result)
00128 rospy.logdebug("STDERR: %s" % error)
00129
00130 if "generated:" not in result:
00131 rospy.logerr("Failed to compile grammar to DFA: %s" % error.strip())
00132 return None
00133
00134 with open(os.path.join(temp_dir, "{name}.dfa".format(name=name)), "r") as f:
00135 dfa = f.read()
00136 with open(os.path.join(temp_dir, "{name}.dict".format(name=name)), "r") as f:
00137 dic = f.read()
00138 return dfa, dic
00139
00140
00141 if __name__ == '__main__':
00142 result = make_phonemes_from_words(["うどん", "そば"])
00143 assert result[0] == "u d o N"
00144 assert result[1] == "s o b a"