Go to the documentation of this file.00001
00002
00003
00004
00005 import rospy
00006 import unittest
00007 import rostest
00008
00009
00010 from rospeex_if import ROSpeexInterface
00011 from rospeex_core.sr.base.session import PacketType
00012
00013
00014 import os
00015 import sys
00016
00017
00018 import std_msgs.msg
00019 from rospeex_msgs.msg import SignalProcessingStream
00020 from rospeex_msgs.msg import SignalProcessingResponse
00021
00022 class TestRospeexSR(unittest.TestCase):
00023 """ Speech Recognition Test class """
00024 def setUp(self):
00025 self._SPI_RESPONSE_TOPIC_NAME = 'spi_res'
00026 self._SPI_STREAM_TOPIC_NAME = 'spi_stream'
00027 self._message = ''
00028 self._ready = 1
00029 self._interface = None
00030 self._wait_flag = True
00031 self._wait_count = 0
00032
00033 self._sr_req_id = 0
00034
00035 def _sr_response(self, message):
00036 """
00037 speech synthesis response callback
00038
00039 @param message: message class
00040 @type message: str
00041 """
00042 self._message = message
00043 self._ready = 1
00044 rospy.loginfo('Message Received.')
00045 rospy.logdebug(self._message.decode('utf-8').encode('utf-8'))
00046 self._wait_count -= 1
00047
00048 def _read_input_data(self, target_file):
00049 """
00050 read input data
00051
00052 @param target_file: target test data
00053 @type target_file: str
00054 """
00055 data = None
00056 with open(target_file, 'rb') as f_log:
00057 data = f_log.read()
00058
00059 if data is None:
00060 rospy.logwarn('[%s] is not found.' % target_file)
00061 return data
00062
00063 def _execute_case(self, voice_data, target_lang, target_engine):
00064 """
00065 execute test case
00066
00067 @param voice data: voice data (binary wav)
00068 @type voice data: str
00069 @param target_lang: recognition ranguage
00070 @type target_lang: str
00071 @param target_engine: target speech recognition engine
00072 @type target_engine: str (nict | google | microsoft)
00073 """
00074 rospy.sleep(5)
00075 self._ready = 0
00076
00077 self._send_spi_stream('', PacketType.START)
00078 self._send_spi_stream(voice_data, PacketType.DATA)
00079 self._send_spi_stream('', PacketType.END)
00080
00081 self._send_spi_response(voice_data)
00082 self._wait_count += 1
00083
00084 while not self._ready and self._wait_flag:
00085 rospy.sleep(0.1)
00086
00087 def _send_spi_stream(self, voice_data, packet_type):
00088 """
00089 publish /spi_stream topic
00090
00091 @param voice_data: voice data (binary wav)
00092 @type voice_data: str
00093 @param packet_type: send packet type (PacketType)
00094 @type packet_type: integer
00095 """
00096 msg = SignalProcessingStream()
00097 msg.header = std_msgs.msg.Header()
00098 msg.header.stamp = rospy.Time.now()
00099 msg.header.seq = self._sr_req_id
00100 msg.packet_type = packet_type
00101 msg.packet_data = voice_data
00102 self._pub_spi_stream.publish(msg)
00103 self._sr_req_id += 1
00104
00105 def _send_spi_response(self, voice_data):
00106 """
00107 publish /spi_response topic
00108
00109 @param voice_data: voice data (binary wav)
00110 @type voice_data: str
00111 """
00112 msg = SignalProcessingResponse()
00113 msg.header.user = rospy.get_name()
00114 msg.header.request_id = str(self._sr_req_id)
00115 msg.data = voice_data
00116 self._pub_spi.publish(msg)
00117 self._sr_req_id += 1
00118
00119 def _dummy(self):
00120 return 1
00121
00122 def test_run(self):
00123 self._ready = 1
00124 rospy.init_node('test_rospeex_sr')
00125 self._interface = ROSpeexInterface()
00126 self._interface.init(ss=False, sr=True, spi=True)
00127 self._interface.register_sr_response(self._sr_response)
00128 rospy.sleep(1)
00129
00130 target_dir = rospy.get_param('~target_dir', '../testdata')
00131 target_engine = rospy.get_param('~target_engine', 'nict')
00132 target_engine_list = list(rospy.get_param('~target_engine_list', ''))
00133 target_language = rospy.get_param('~target_language', 'ja')
00134 target_files = list(rospy.get_param('~target_files', ''))
00135 expected_results = list(rospy.get_param('~expected_results', ''))
00136 self._wait_flag = rospy.get_param('~wait_flag', True)
00137
00138 self._pub_spi = rospy.Publisher(
00139 self._SPI_RESPONSE_TOPIC_NAME,
00140 SignalProcessingResponse,
00141 queue_size=10
00142 )
00143
00144 self._pub_spi_stream = rospy.Publisher(
00145 self._SPI_STREAM_TOPIC_NAME,
00146 SignalProcessingStream,
00147 queue_size=100
00148 )
00149
00150 if len(target_engine_list) == 0:
00151 target_engine_list.append(target_engine)
00152
00153 for engine in target_engine_list:
00154 cnt = 0
00155 if not rospy.is_shutdown():
00156 for target_file in target_files:
00157 self._interface.set_spi_config(language=target_language, engine=engine)
00158 rospy.loginfo(target_file)
00159 filename = os.path.join(target_dir, target_file)
00160 voice_data = self._read_input_data(filename)
00161 if voice_data:
00162 self._execute_case(voice_data, target_language, engine)
00163
00164 res = self._message.decode('utf-8').encode('utf-8')
00165 msg = expected_results[cnt].encode('utf-8')
00166 self.assertEqual(res, msg, 'Speech recognition failed.')
00167 else:
00168 rospy.logerror('invalid voice data')
00169
00170 cnt += 1
00171
00172 while self._wait_count > 0:
00173 rospy.sleep(0.1)
00174
00175
00176 if __name__ == '__main__':
00177 rostest.rosrun('rospeex_if', 'rospeex_sr_test', TestRospeexSR, sys.argv)