test_rospeex_sr_stream.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- coding: utf-8 -*-
00003 
00004 # import ros packages
00005 import rospy
00006 import unittest
00007 import rostest
00008 
00009 # Import other libraries
00010 from rospeex_if import ROSpeexInterface
00011 from rospeex_core.sr.base.session import PacketType
00012 
00013 # import python libraries
00014 import os
00015 import sys
00016 
00017 # import ROS messages
00018 import std_msgs.msg
00019 from rospeex_msgs.msg import SignalProcessingStream
00020 from rospeex_msgs.msg import SignalProcessingResponse
00021 
00022 class TestRospeexSR(unittest.TestCase):
00023     """ Speech Recognition Test class """
00024     def setUp(self):
00025         self._SPI_RESPONSE_TOPIC_NAME = 'spi_res'
00026         self._SPI_STREAM_TOPIC_NAME = 'spi_stream'
00027         self._message = ''
00028         self._ready = 1
00029         self._interface = None
00030         self._wait_flag = True
00031         self._wait_count = 0
00032 
00033         self._sr_req_id = 0
00034 
00035     def _sr_response(self, message):
00036         """
00037         speech synthesis response callback
00038 
00039         @param message: message class
00040         @type  message: str
00041         """
00042         self._message = message
00043         self._ready = 1
00044         rospy.loginfo('Message Received.')
00045         rospy.logdebug(self._message.decode('utf-8').encode('utf-8'))
00046         self._wait_count -= 1
00047 
00048     def _read_input_data(self, target_file):
00049         """
00050         read input data
00051 
00052         @param target_file: target test data
00053         @type  target_file: str
00054         """
00055         data = None
00056         with open(target_file, 'rb') as f_log:
00057             data = f_log.read()
00058 
00059         if data is None:
00060             rospy.logwarn('[%s] is not found.' % target_file)
00061         return data
00062 
00063     def _execute_case(self, voice_data, target_lang, target_engine):
00064         """
00065         execute test case
00066 
00067         @param voice data: voice data (binary wav)
00068         @type  voice data: str
00069         @param target_lang: recognition ranguage
00070         @type  target_lang: str
00071         @param target_engine: target speech recognition engine
00072         @type  target_engine: str (nict | google | microsoft)
00073         """
00074         rospy.sleep(5) # wait for rospeex sr node
00075         self._ready = 0
00076 
00077         self._send_spi_stream('', PacketType.START)
00078         self._send_spi_stream(voice_data, PacketType.DATA)
00079         self._send_spi_stream('', PacketType.END)
00080 
00081         self._send_spi_response(voice_data)
00082         self._wait_count += 1
00083 
00084         while not self._ready and self._wait_flag:
00085             rospy.sleep(0.1)  # wait until callback response
00086 
00087     def _send_spi_stream(self, voice_data, packet_type):
00088         """
00089         publish /spi_stream topic
00090 
00091         @param voice_data: voice data (binary wav)
00092         @type  voice_data: str
00093         @param packet_type: send packet type (PacketType)
00094         @type  packet_type: integer
00095         """
00096         msg = SignalProcessingStream()
00097         msg.header = std_msgs.msg.Header()
00098         msg.header.stamp = rospy.Time.now()
00099         msg.header.seq = self._sr_req_id
00100         msg.packet_type = packet_type
00101         msg.packet_data = voice_data
00102         self._pub_spi_stream.publish(msg)
00103         self._sr_req_id += 1
00104 
00105     def _send_spi_response(self, voice_data):
00106         """
00107         publish /spi_response topic
00108 
00109         @param voice_data: voice data (binary wav)
00110         @type  voice_data: str
00111         """
00112         msg = SignalProcessingResponse()
00113         msg.header.user = rospy.get_name()
00114         msg.header.request_id = str(self._sr_req_id)
00115         msg.data = voice_data
00116         self._pub_spi.publish(msg)
00117         self._sr_req_id += 1
00118 
00119     def _dummy(self):
00120         return 1
00121 
00122     def test_run(self):
00123         self._ready = 1
00124         rospy.init_node('test_rospeex_sr')
00125         self._interface = ROSpeexInterface()
00126         self._interface.init(ss=False, sr=True, spi=True)
00127         self._interface.register_sr_response(self._sr_response)
00128         rospy.sleep(1)
00129 
00130         target_dir = rospy.get_param('~target_dir', '../testdata')
00131         target_engine = rospy.get_param('~target_engine', 'nict')
00132         target_engine_list = list(rospy.get_param('~target_engine_list', ''))
00133         target_language = rospy.get_param('~target_language', 'ja')
00134         target_files = list(rospy.get_param('~target_files', ''))
00135         expected_results = list(rospy.get_param('~expected_results', ''))
00136         self._wait_flag = rospy.get_param('~wait_flag', True)
00137 
00138         self._pub_spi = rospy.Publisher(
00139             self._SPI_RESPONSE_TOPIC_NAME,
00140             SignalProcessingResponse,
00141             queue_size=10
00142         )
00143 
00144         self._pub_spi_stream = rospy.Publisher(
00145             self._SPI_STREAM_TOPIC_NAME,
00146             SignalProcessingStream,
00147             queue_size=100
00148         )
00149 
00150         if len(target_engine_list) == 0:
00151             target_engine_list.append(target_engine)
00152 
00153         for engine in target_engine_list:
00154             cnt = 0
00155             if not rospy.is_shutdown():
00156                 for target_file in target_files:
00157                     self._interface.set_spi_config(language=target_language, engine=engine)
00158                     rospy.loginfo(target_file)
00159                     filename = os.path.join(target_dir, target_file)
00160                     voice_data = self._read_input_data(filename)
00161                     if voice_data:
00162                         self._execute_case(voice_data, target_language, engine)
00163 
00164                         res = self._message.decode('utf-8').encode('utf-8')
00165                         msg = expected_results[cnt].encode('utf-8')
00166                         self.assertEqual(res, msg, 'Speech recognition failed.')
00167                     else:
00168                         rospy.logerror('invalid voice data')
00169                     
00170                     cnt += 1
00171 
00172         while self._wait_count > 0:
00173             rospy.sleep(0.1)
00174 
00175 
00176 if __name__ == '__main__':
00177     rostest.rosrun('rospeex_if', 'rospeex_sr_test', TestRospeexSR, sys.argv)


rospeex_if
Author(s): Komei Sugiura
autogenerated on Thu Jun 6 2019 18:53:13