00001
00002
00003
00004
00005 import os
00006 import struct
00007 import subprocess
00008 import traceback
00009 import threading
00010
00011
00012 from rospeex_core.validators import accepts
00013 from rospeex_core.exceptions import SignalProcessingInterfaceException
00014 from rospeex_core import logging_util
00015 from rospeex_msgs.msg import SignalProcessingStream
00016
00017
00018 logger = logging_util.get_logger(__name__)
00019
00020
00021 class WaveFile(object):
00022 """ Wave file converter class """
00023
00024 def __init__(self):
00025 """ initialize function """
00026 self._data = ''
00027 self._header = ''
00028 self._count = 0
00029
00030 def build(self, data):
00031 """
00032 build wave file
00033 @param data: voice data
00034 @type data: str
00035 """
00036 self._create_header(len(data))
00037 self._data = self._header
00038 fmt = '%dH' % len(data)
00039 pack_data = struct.pack(fmt, *data)
00040 self._data += pack_data
00041 return self._data
00042
00043 def _create_header(self, data_size):
00044 """
00045 create wave file header
00046 @param data_size: wav data part size
00047 @type data_size: int
00048 """
00049 self._header = []
00050 fmt_chunk = struct.pack('4s I 2H 2I 2H', *[
00051 'fmt ',
00052 16,
00053 1,
00054 1,
00055 16000,
00056 32000,
00057 2,
00058 16])
00059
00060 data_chunk = struct.pack('4s I', *[
00061 'data',
00062 data_size*2])
00063
00064 file_size = len(data_chunk) + len(fmt_chunk) + 12 - 8 + data_size*2
00065 wav_header = struct.pack(
00066 '4s I 4s',
00067 *[
00068 'RIFF',
00069 file_size,
00070 'WAVE'
00071 ]
00072 )
00073 self._header = wav_header + fmt_chunk + data_chunk
00074
00075
00076 class Connector(object):
00077 """ AppConnector class """
00078 def __init__(
00079 self,
00080 app_dir,
00081 controller,
00082 nict_mmcv,
00083 mmse_setting,
00084 vad_setting,
00085 ip_addr,
00086 recv_port,
00087 send_port,
00088 log_level
00089 ):
00090 """
00091 initialize function
00092 @param app_dir: application directory
00093 @type app_dir:
00094 @param controller:
00095 @type controller:
00096 @param nict_mmcv:
00097 @type nict_mmcv:
00098 @param mmse_setting:
00099 @type mmse_setting:
00100 @param vad_setting:
00101 @type vad_setting:
00102 @param ip_addr:
00103 @type ip_addr:
00104 @param recv_port:
00105 @type recv_port:
00106 @param send_port:
00107 @type send_port:
00108 @param log_level:
00109 @type log_level:
00110 """
00111 self._app_dir = app_dir
00112 self._controller = os.path.join(app_dir, controller)
00113 self._nict_mmcv = os.path.join(app_dir, nict_mmcv)
00114 self._mmse_setting = os.path.join(app_dir, mmse_setting)
00115 self._vad_setting = os.path.join(app_dir, vad_setting)
00116 self._cwd = app_dir
00117 self._ip_addr = ip_addr
00118 self._recv_port = recv_port
00119 self._send_port = send_port
00120 self._log_level = log_level
00121 self._process = None
00122
00123 def launch(self):
00124 """
00125 launch application
00126 @raise SignalProcessingInterfaceException:
00127 NICTmmcvController is not installed.
00128 """
00129
00130 args = [
00131 self._controller,
00132 '-i', self._ip_addr,
00133 '-m', self._mmse_setting,
00134 '-v', self._vad_setting,
00135 '-b', self._nict_mmcv,
00136 '-l', self._log_level,
00137 '-s', str(self._send_port),
00138 '-c', str(self._recv_port)
00139 ]
00140
00141 devnull = open(os.devnull, 'w')
00142 options = {
00143 'stdin': subprocess.PIPE,
00144 'stdout': subprocess.PIPE,
00145 'stderr': devnull,
00146 'cwd': self._cwd,
00147 'close_fds': True
00148 }
00149
00150
00151 try:
00152 self._process = subprocess.Popen(args, **options)
00153
00154 except (OSError, ValueError) as err:
00155 raise SignalProcessingInterfaceException(
00156 'NICTmmcvController is not installed. Exception:%s',
00157 str(err)
00158 )
00159
00160 except Exception as err:
00161 raise SignalProcessingInterfaceException(
00162 'unknown exception. Traceback: %s',
00163 traceback.format_exc()
00164 )
00165
00166 def read(self, data_size):
00167 """
00168 read data from stdout
00169 @param data_size: read data size
00170 @type data_size: integer
00171 @return: read data
00172 @rtype: str
00173 @raize: SignalProcessingInterfaceException:
00174 NICTmmcvController is not launch
00175 """
00176 if not self._process:
00177 msg = 'NICTmmcvController is not launched.'
00178 raise SignalProcessingInterfaceException(msg)
00179 return self._process.stdout.read(data_size)
00180
00181 def write(self, data):
00182 """
00183 write data
00184 @param data: write data
00185 @type data: str
00186 @raize: SignalProcessingInterfaceException:
00187 NICTmmcvController is not launch
00188 """
00189 if not self._process:
00190 raise SignalProcessingInterfaceException(
00191 'NICTmmcvController is not launched.'
00192 )
00193 self._process.stdin.write(data)
00194 self._process.stdin.flush()
00195
00196 def poll(self):
00197 """
00198 polling process
00199 @return errorcode
00200 @rtype errorcode
00201 @raize: SignalProcessingInterfaceException:
00202 NICTmmcvController is not launch
00203 """
00204 if not self._process:
00205 raise SignalProcessingInterfaceException(
00206 'NICTmmcvController is not launch.'
00207 )
00208 return self._process.poll()
00209
00210 def shutdown(self):
00211 """ shutdown application """
00212 if self._process is not None:
00213 self._process.kill()
00214 self._process.wait()
00215 self._process = None
00216
00217
00218 class FrameSyncHeader:
00219 """
00220 Frame Sync Header class
00221 """
00222 INIT = -1
00223 DATA_MARK = 0
00224 START_MARK = 1
00225 END_MARK = 2
00226 TOF_MARK = 9
00227 EOF_MARK = 10
00228
00229 __HEADER_TO_STR = {
00230 INIT: 'INIT',
00231 DATA_MARK: 'DATA_MARK',
00232 START_MARK: 'START_MARK',
00233 END_MARK: 'END_MARK',
00234 TOF_MARK: 'TOF_MARK',
00235 EOF_MARK: 'EOF_MARK',
00236 }
00237
00238 @classmethod
00239 def is_valid(cls, header):
00240 if header in cls.__HEADER_TO_STR.keys():
00241 return True
00242 return False
00243
00244 @classmethod
00245 def to_str(cls, header):
00246 if header in cls.__HEADER_TO_STR:
00247 return cls.__HEADER_TO_STR[header]
00248 return 'UNKNOWN'
00249
00250
00251 class ConnectorInterface(threading.Thread):
00252 """
00253 application controller interface class
00254 """
00255 @accepts(
00256 app_dir=basestring,
00257 controller=basestring,
00258 nict_mmcv=basestring,
00259 mmse_setting=basestring,
00260 vad_setting=basestring,
00261 ip_addr=basestring,
00262 recv_port=int,
00263 send_port=int
00264 )
00265 def __init__(
00266 self,
00267 app_dir,
00268 controller,
00269 nict_mmcv,
00270 mmse_setting,
00271 vad_setting,
00272 ip_addr,
00273 recv_port,
00274 send_port,
00275 log_level
00276 ):
00277 """
00278 initialize function
00279
00280 @param app_dir: application directory
00281 @type app_dir: str
00282 @param controller: controller name
00283 @type controller: str
00284 @param nict_mmcv: mmcv applicaton path
00285 @type nict_mmcv: str
00286 @param mmse_setting: mmse setting file path
00287 @type mmse_setting: str
00288 @param vad_setting: vad setting file path
00289 @type vad_setting: str
00290 @param ip_addr: controller ip address
00291 @type ip_addr: str
00292 @param recv_port: data receive port
00293 @type recv_port: int
00294 @param send_port: data send port
00295 @type send_port: int
00296 @param log_level: loglevel
00297 @type log_level: str
00298 """
00299 threading.Thread.__init__(self)
00300 self._connector = Connector(
00301 app_dir, controller, nict_mmcv, mmse_setting,
00302 vad_setting, ip_addr, recv_port, send_port, log_level
00303 )
00304 self._stop_request = threading.Event()
00305 self._callback = None
00306 self._voice_data = []
00307 self._stream_callback = None
00308 self.FRAME_SYNC_DATA_SIZE = 320
00309 self.FRAME_SYNC_HEADER_SIZE = 4
00310 self.STREAMING_SEND_NUM = 5
00311 self.STREAMING_DATA_SIZE = self.STREAMING_SEND_NUM * self.FRAME_SYNC_DATA_SIZE
00312 self.FRAME_SYNC_SIZE = self.FRAME_SYNC_HEADER_SIZE + self.FRAME_SYNC_DATA_SIZE
00313
00314 def join(self, timeout=None):
00315 """
00316 shutdown application
00317 """
00318 self._stop_request.set()
00319 super(ConnectorInterface, self).join(timeout)
00320
00321 def register_callback(self, callback):
00322 """
00323 get wav data
00324 @param callback: wav file callback
00325 @type callback: function
00326 """
00327 self._callback = callback
00328
00329 def register_stream_callback(self, callback):
00330 self._stream_callback = callback
00331
00332 def set_play_sound_state(self, state):
00333 """
00334 set play sound state
00335 @param state: play sound state
00336 @type state: True for playing sound.
00337 False for NOT playing sound.
00338 """
00339 msg = 'play_sound_on\n\n'
00340
00341 if not state:
00342 msg = 'play_sound_off\n\n'
00343
00344 self._connector.write(msg)
00345
00346 def _send_streaming_data(self, packet_type, packet_data=''):
00347 """
00348 @param packet_type:
00349 @type packet_type:
00350 @param packet_data:
00351 @type packet_data:
00352 """
00353 if self._stream_callback:
00354 self._stream_callback(packet_type, packet_data)
00355
00356 def _split_framesync_packet(self, recv_data):
00357 """
00358 @param recv_data:
00359 @type recv_data:
00360 """
00361 header = None
00362 data = None
00363 if len(recv_data) == self.FRAME_SYNC_SIZE:
00364 raw_header = recv_data[0:self.FRAME_SYNC_HEADER_SIZE]
00365 header = struct.unpack('<I', raw_header)[0]
00366
00367 if header == FrameSyncHeader.DATA_MARK:
00368 fmt = '<%dH' % (self.FRAME_SYNC_DATA_SIZE/2)
00369 data = struct.unpack(
00370 fmt,
00371 recv_data[self.FRAME_SYNC_HEADER_SIZE:self.FRAME_SYNC_SIZE]
00372 )
00373
00374 return header, data
00375
00376 def _send_streaming_data_packet(self, streaming_data):
00377 """
00378 @param streaming_data:
00379 @type streaming_data:
00380 """
00381 if len(streaming_data) > 0:
00382 fmt = '{}H'.format(len(streaming_data))
00383 pack_data = struct.pack(fmt, *streaming_data)
00384 self._send_streaming_data(
00385 SignalProcessingStream.DATA,
00386 pack_data
00387 )
00388
00389 def run(self):
00390 """ thread main """
00391 streaming_data = []
00392 audio_state = FrameSyncHeader.INIT
00393
00394
00395 self._connector.launch()
00396
00397
00398 while not self._stop_request.isSet():
00399
00400 if self._connector.poll():
00401 msg = 'NICTmmcvController is terminated.'
00402 raise SignalProcessingInterfaceException(msg)
00403
00404
00405 recv_data = self._connector.read(self.FRAME_SYNC_SIZE)
00406
00407
00408 header, voice_data = self._split_framesync_packet(recv_data)
00409
00410
00411 if header == FrameSyncHeader.DATA_MARK:
00412
00413 data_accept_state = [
00414 FrameSyncHeader.START_MARK,
00415 FrameSyncHeader.DATA_MARK
00416 ]
00417
00418 if audio_state in data_accept_state:
00419 streaming_data.extend(list(voice_data))
00420 self._voice_data.extend(list(voice_data))
00421 if len(streaming_data) > self.STREAMING_DATA_SIZE:
00422 self._send_streaming_data_packet(streaming_data)
00423 streaming_data = []
00424 audio_state = FrameSyncHeader.DATA_MARK
00425
00426
00427 elif header == FrameSyncHeader.START_MARK:
00428
00429 logger.debug('START_MARK received')
00430 self._send_streaming_data(SignalProcessingStream.START)
00431 audio_state = FrameSyncHeader.START_MARK
00432
00433
00434 elif header == FrameSyncHeader.END_MARK:
00435 logger.debug('END_MARK received')
00436 send_end_headers = [
00437 FrameSyncHeader.START_MARK,
00438 FrameSyncHeader.DATA_MARK
00439 ]
00440
00441 if audio_state in send_end_headers:
00442
00443 self._send_streaming_data_packet(streaming_data)
00444 streaming_data = []
00445
00446
00447 self._send_streaming_data(SignalProcessingStream.END)
00448
00449 if self._callback:
00450 wav = WaveFile()
00451 wav_data = wav.build(self._voice_data)
00452 self._callback(wav_data)
00453
00454 self._voice_data = []
00455 audio_state = FrameSyncHeader.END_MARK
00456
00457
00458 elif header == FrameSyncHeader.TOF_MARK:
00459 logger.debug('TOF_MARK received')
00460 audio_state = FrameSyncHeader.END_MARK
00461
00462
00463 elif header == FrameSyncHeader.EOF_MARK:
00464 logger.debug('EOF_MARK received')
00465 audio_state = FrameSyncHeader.END_MARK
00466
00467 else:
00468 msg = 'Invalid header: {}'.format(header)
00469 raise SignalProcessingInterfaceException(msg)
00470
00471
00472 self._connector.shutdown()