signal_processing_interface.py
Go to the documentation of this file.
00001 #!/usr/bin/python
00002 # -*- coding: utf-8 -*-
00003 
00004 # import python libraries
00005 import os
00006 import struct
00007 import subprocess
00008 import traceback
00009 import threading
00010 
00011 # import local libraries
00012 from rospeex_core.validators import accepts
00013 from rospeex_core.exceptions import SignalProcessingInterfaceException
00014 from rospeex_core import logging_util
00015 from rospeex_msgs.msg import SignalProcessingStream
00016 
00017 # get logger
00018 logger = logging_util.get_logger(__name__)
00019 
00020 
00021 class WaveFile(object):
00022     """ Wave file converter class """
00023 
00024     def __init__(self):
00025         """ initialize function """
00026         self._data = ''
00027         self._header = ''
00028         self._count = 0
00029 
00030     def build(self, data):
00031         """
00032         build wave file
00033         @param data: voice data
00034         @type  data: str
00035         """
00036         self._create_header(len(data))
00037         self._data = self._header
00038         fmt = '%dH' % len(data)
00039         pack_data = struct.pack(fmt, *data)
00040         self._data += pack_data
00041         return self._data
00042 
00043     def _create_header(self, data_size):
00044         """
00045         create wave file header
00046         @param data_size: wav data part size
00047         @type  data_size: int
00048         """
00049         self._header = []
00050         fmt_chunk = struct.pack('4s I 2H 2I 2H', *[
00051             'fmt ',         # chunk_id   = "fmt "
00052             16,             # chunk_byte = sizeof(fmt_chunk) - 8
00053             1,              # format_id  = 1
00054             1,              # channels   = 1
00055             16000,          # rate       = 16000
00056             32000,          # velocity   = 32000
00057             2,              # block_size = 2
00058             16])            # bits = 16
00059 
00060         data_chunk = struct.pack('4s I', *[
00061             'data',         # data chunk id
00062             data_size*2])     # data size
00063 
00064         file_size = len(data_chunk) + len(fmt_chunk) + 12 - 8 + data_size*2
00065         wav_header = struct.pack(
00066             '4s I 4s',
00067             *[
00068                 'RIFF',         # riff header
00069                 file_size,      # filesize - 8
00070                 'WAVE'          # "WAVE"
00071             ]
00072         )
00073         self._header = wav_header + fmt_chunk + data_chunk
00074 
00075 
00076 class Connector(object):
00077     """ AppConnector class """
00078     def __init__(
00079         self,
00080         app_dir,
00081         controller,
00082         nict_mmcv,
00083         mmse_setting,
00084         vad_setting,
00085         ip_addr,
00086         recv_port,
00087         send_port,
00088         log_level
00089     ):
00090         """
00091         initialize function
00092         @param app_dir: application directory
00093         @type  app_dir:
00094         @param controller:
00095         @type  controller:
00096         @param nict_mmcv:
00097         @type  nict_mmcv:
00098         @param mmse_setting:
00099         @type  mmse_setting:
00100         @param vad_setting:
00101         @type  vad_setting:
00102         @param ip_addr:
00103         @type  ip_addr:
00104         @param recv_port:
00105         @type  recv_port:
00106         @param send_port:
00107         @type  send_port:
00108         @param log_level:
00109         @type  log_level:
00110         """
00111         self._app_dir = app_dir
00112         self._controller = os.path.join(app_dir, controller)
00113         self._nict_mmcv = os.path.join(app_dir, nict_mmcv)
00114         self._mmse_setting = os.path.join(app_dir, mmse_setting)
00115         self._vad_setting = os.path.join(app_dir, vad_setting)
00116         self._cwd = app_dir
00117         self._ip_addr = ip_addr
00118         self._recv_port = recv_port
00119         self._send_port = send_port
00120         self._log_level = log_level
00121         self._process = None
00122 
00123     def launch(self):
00124         """
00125         launch application
00126         @raise SignalProcessingInterfaceException:
00127                NICTmmcvController is not installed.
00128         """
00129         # create arguments
00130         args = [
00131             self._controller,
00132             '-i', self._ip_addr,
00133             '-m', self._mmse_setting,
00134             '-v', self._vad_setting,
00135             '-b', self._nict_mmcv,
00136             '-l', self._log_level,
00137             '-s', str(self._send_port),
00138             '-c', str(self._recv_port)
00139         ]
00140 
00141         devnull = open(os.devnull, 'w')
00142         options = {
00143             'stdin': subprocess.PIPE,
00144             'stdout': subprocess.PIPE,
00145             'stderr': devnull,
00146             'cwd': self._cwd,
00147             'close_fds': True
00148         }
00149 
00150         # launch process
00151         try:
00152             self._process = subprocess.Popen(args, **options)
00153 
00154         except (OSError, ValueError) as err:
00155             raise SignalProcessingInterfaceException(
00156                 'NICTmmcvController is not installed. Exception:%s',
00157                 str(err)
00158             )
00159 
00160         except Exception as err:
00161             raise SignalProcessingInterfaceException(
00162                 'unknown exception. Traceback: %s',
00163                 traceback.format_exc()
00164             )
00165 
00166     def read(self, data_size):
00167         """
00168         read data from stdout
00169         @param data_size: read data size
00170         @type  data_size: integer
00171         @return: read data
00172         @rtype: str
00173         @raize: SignalProcessingInterfaceException:
00174                 NICTmmcvController is not launch
00175         """
00176         if not self._process:
00177             msg = 'NICTmmcvController is not launched.'
00178             raise SignalProcessingInterfaceException(msg)
00179         return self._process.stdout.read(data_size)
00180 
00181     def write(self, data):
00182         """
00183         write data
00184         @param data: write data
00185         @type  data: str
00186         @raize: SignalProcessingInterfaceException:
00187                 NICTmmcvController is not launch
00188         """
00189         if not self._process:
00190             raise SignalProcessingInterfaceException(
00191                 'NICTmmcvController is not launched.'
00192             )
00193         self._process.stdin.write(data)
00194         self._process.stdin.flush()
00195 
00196     def poll(self):
00197         """
00198         polling process
00199         @return errorcode
00200         @rtype errorcode
00201         @raize: SignalProcessingInterfaceException:
00202                 NICTmmcvController is not launch
00203         """
00204         if not self._process:
00205             raise SignalProcessingInterfaceException(
00206                 'NICTmmcvController is not launch.'
00207             )
00208         return self._process.poll()
00209 
00210     def shutdown(self):
00211         """ shutdown application """
00212         if self._process is not None:
00213             self._process.kill()
00214             self._process.wait()
00215             self._process = None
00216 
00217 
00218 class FrameSyncHeader:
00219     """
00220     Frame Sync Header class
00221     """
00222     INIT = -1
00223     DATA_MARK = 0
00224     START_MARK = 1
00225     END_MARK = 2
00226     TOF_MARK = 9
00227     EOF_MARK = 10
00228 
00229     __HEADER_TO_STR = {
00230         INIT: 'INIT',
00231         DATA_MARK: 'DATA_MARK',
00232         START_MARK: 'START_MARK',
00233         END_MARK: 'END_MARK',
00234         TOF_MARK: 'TOF_MARK',
00235         EOF_MARK: 'EOF_MARK',
00236     }
00237 
00238     @classmethod
00239     def is_valid(cls, header):
00240         if header in cls.__HEADER_TO_STR.keys():
00241             return True
00242         return False
00243 
00244     @classmethod
00245     def to_str(cls, header):
00246         if header in cls.__HEADER_TO_STR:
00247             return cls.__HEADER_TO_STR[header]
00248         return 'UNKNOWN'
00249 
00250 
00251 class ConnectorInterface(threading.Thread):
00252     """
00253     application controller interface class
00254     """
00255     @accepts(
00256         app_dir=basestring,
00257         controller=basestring,
00258         nict_mmcv=basestring,
00259         mmse_setting=basestring,
00260         vad_setting=basestring,
00261         ip_addr=basestring,
00262         recv_port=int,
00263         send_port=int
00264     )
00265     def __init__(
00266         self,
00267         app_dir,
00268         controller,
00269         nict_mmcv,
00270         mmse_setting,
00271         vad_setting,
00272         ip_addr,
00273         recv_port,
00274         send_port,
00275         log_level
00276     ):
00277         """
00278         initialize function
00279 
00280         @param app_dir: application directory
00281         @type  app_dir: str
00282         @param controller: controller name
00283         @type  controller: str
00284         @param nict_mmcv: mmcv applicaton path
00285         @type  nict_mmcv: str
00286         @param mmse_setting: mmse setting file path
00287         @type  mmse_setting: str
00288         @param vad_setting: vad setting file path
00289         @type  vad_setting: str
00290         @param ip_addr: controller ip address
00291         @type  ip_addr: str
00292         @param recv_port: data receive port
00293         @type  recv_port: int
00294         @param send_port: data send port
00295         @type  send_port: int
00296         @param log_level: loglevel
00297         @type  log_level: str
00298         """
00299         threading.Thread.__init__(self)
00300         self._connector = Connector(
00301             app_dir, controller, nict_mmcv, mmse_setting,
00302             vad_setting, ip_addr, recv_port, send_port, log_level
00303         )
00304         self._stop_request = threading.Event()
00305         self._callback = None
00306         self._voice_data = []
00307         self._stream_callback = None
00308         self.FRAME_SYNC_DATA_SIZE = 320
00309         self.FRAME_SYNC_HEADER_SIZE = 4
00310         self.STREAMING_SEND_NUM = 5
00311         self.STREAMING_DATA_SIZE = self.STREAMING_SEND_NUM * self.FRAME_SYNC_DATA_SIZE
00312         self.FRAME_SYNC_SIZE = self.FRAME_SYNC_HEADER_SIZE + self.FRAME_SYNC_DATA_SIZE
00313 
00314     def join(self, timeout=None):
00315         """
00316         shutdown application
00317         """
00318         self._stop_request.set()
00319         super(ConnectorInterface, self).join(timeout)
00320 
00321     def register_callback(self, callback):
00322         """
00323         get wav data
00324         @param callback: wav file callback
00325         @type  callback: function
00326         """
00327         self._callback = callback
00328 
00329     def register_stream_callback(self, callback):
00330         self._stream_callback = callback
00331 
00332     def set_play_sound_state(self, state):
00333         """
00334         set play sound state
00335         @param state: play sound state
00336         @type  state: True for playing sound.
00337                       False for NOT playing sound.
00338         """
00339         msg = 'play_sound_on\n\n'
00340 
00341         if not state:
00342             msg = 'play_sound_off\n\n'
00343 
00344         self._connector.write(msg)
00345 
00346     def _send_streaming_data(self, packet_type, packet_data=''):
00347         """
00348         @param packet_type:
00349         @type  packet_type:
00350         @param packet_data:
00351         @type  packet_data:
00352         """
00353         if self._stream_callback:
00354             self._stream_callback(packet_type, packet_data)
00355 
00356     def _split_framesync_packet(self, recv_data):
00357         """
00358         @param recv_data:
00359         @type  recv_data:
00360         """
00361         header = None
00362         data = None
00363         if len(recv_data) == self.FRAME_SYNC_SIZE:
00364             raw_header = recv_data[0:self.FRAME_SYNC_HEADER_SIZE]
00365             header = struct.unpack('<I', raw_header)[0]
00366 
00367         if header == FrameSyncHeader.DATA_MARK:
00368             fmt = '<%dH' % (self.FRAME_SYNC_DATA_SIZE/2)
00369             data = struct.unpack(
00370                 fmt,
00371                 recv_data[self.FRAME_SYNC_HEADER_SIZE:self.FRAME_SYNC_SIZE]
00372             )
00373 
00374         return header, data
00375 
00376     def _send_streaming_data_packet(self, streaming_data):
00377         """
00378         @param streaming_data:
00379         @type  streaming_data:
00380         """
00381         if len(streaming_data) > 0:
00382             fmt = '{}H'.format(len(streaming_data))
00383             pack_data = struct.pack(fmt, *streaming_data)
00384             self._send_streaming_data(
00385                 SignalProcessingStream.DATA,
00386                 pack_data
00387             )
00388 
00389     def run(self):
00390         """ thread main """
00391         streaming_data = []
00392         audio_state = FrameSyncHeader.INIT
00393 
00394         # launch application
00395         self._connector.launch()
00396 
00397         # main loop
00398         while not self._stop_request.isSet():
00399             # check application state
00400             if self._connector.poll():
00401                 msg = 'NICTmmcvController is terminated.'
00402                 raise SignalProcessingInterfaceException(msg)
00403 
00404             # read applcation data
00405             recv_data = self._connector.read(self.FRAME_SYNC_SIZE)
00406 
00407             # split framesync data to header and data
00408             header, voice_data = self._split_framesync_packet(recv_data)
00409 
00410             # DATA Frame
00411             if header == FrameSyncHeader.DATA_MARK:
00412                 # logger.debug('DATA_MARK received')
00413                 data_accept_state = [
00414                     FrameSyncHeader.START_MARK,
00415                     FrameSyncHeader.DATA_MARK
00416                 ]
00417 
00418                 if audio_state in data_accept_state:
00419                     streaming_data.extend(list(voice_data))
00420                     self._voice_data.extend(list(voice_data))
00421                     if len(streaming_data) > self.STREAMING_DATA_SIZE:
00422                         self._send_streaming_data_packet(streaming_data)
00423                         streaming_data = []
00424                     audio_state = FrameSyncHeader.DATA_MARK
00425 
00426             # START Frame
00427             elif header == FrameSyncHeader.START_MARK:
00428                 # send start packet
00429                 logger.debug('START_MARK received')
00430                 self._send_streaming_data(SignalProcessingStream.START)
00431                 audio_state = FrameSyncHeader.START_MARK
00432 
00433             # END Frame
00434             elif header == FrameSyncHeader.END_MARK:
00435                 logger.debug('END_MARK received')
00436                 send_end_headers = [
00437                     FrameSyncHeader.START_MARK,
00438                     FrameSyncHeader.DATA_MARK
00439                 ]
00440 
00441                 if audio_state in send_end_headers:
00442                     # send data packet
00443                     self._send_streaming_data_packet(streaming_data)
00444                     streaming_data = []
00445 
00446                     # send end packet
00447                     self._send_streaming_data(SignalProcessingStream.END)
00448 
00449                     if self._callback:
00450                         wav = WaveFile()
00451                         wav_data = wav.build(self._voice_data)
00452                         self._callback(wav_data)
00453 
00454                     self._voice_data = []
00455                 audio_state = FrameSyncHeader.END_MARK
00456 
00457             # TOF Frame
00458             elif header == FrameSyncHeader.TOF_MARK:
00459                 logger.debug('TOF_MARK received')
00460                 audio_state = FrameSyncHeader.END_MARK
00461 
00462             # EOF Frame
00463             elif header == FrameSyncHeader.EOF_MARK:
00464                 logger.debug('EOF_MARK received')
00465                 audio_state = FrameSyncHeader.END_MARK
00466 
00467             else:
00468                 msg = 'Invalid header: {}'.format(header)
00469                 raise SignalProcessingInterfaceException(msg)
00470 
00471         # shutdown connector
00472         self._connector.shutdown()


rospeex_core
Author(s): Komei Sugiura
autogenerated on Thu Jun 6 2019 18:53:10