00001
00002
00003
00004 import time
00005 import socket
00006 import struct
00007 import threading
00008 import copy
00009 import select
00010
00011
00012
00013
00014 import logging_util
00015 import framesync
00016
00017
00018 __all__ = [
00019 'SPIConnector'
00020 ]
00021
00022
00023 logger = logging_util.get_logger(__file__)
00024
00025
00026 class ConnectorBase(threading.Thread):
00027 """ ConnectorBase class
00028 """
00029 RECONNECT_TIME = 2.0
00030
00031 def __init__(self):
00032 """ init class """
00033 super(ConnectorBase, self).__init__()
00034 self._recv_callback = None
00035 self._connection = None
00036 self._is_connected = False
00037 self._keep_alive = True
00038 self._state = True
00039 self._ext_msg = None
00040
00041 def _create_connection(self):
00042 """ create connection
00043 """
00044 pass
00045
00046 def connect(self):
00047 """ connect to audio server
00048 """
00049 if self._connection:
00050 self.disconnect()
00051
00052 try:
00053 self._connection = self._create_connection()
00054 self._is_connected = True
00055 logger.info('[%s] connected.', self.__class__.__name__)
00056
00057 except Exception as err:
00058 logger.warn('[%s] %s', self.__class__.__name__, err)
00059 self.disconnect()
00060 time.sleep(self.RECONNECT_TIME)
00061
00062 def disconnect(self):
00063 """ disconnect from server / client """
00064 if self._is_connected:
00065 if self._connection is not None:
00066 self._connection.close()
00067 self._connection = None
00068 self._is_connected = False
00069
00070 def set_recv_callback(self, recv_cb):
00071 """ set receive callback
00072
00073 :param recv_cb:
00074 """
00075 self._recv_callback = recv_cb
00076
00077 def reset_recv_callback(self):
00078 """ reset receive callback
00079 """
00080 self._recv_callback = None
00081
00082 def send(self, packet):
00083 """ send packet
00084
00085 :param packet:
00086 """
00087 if self.is_connected():
00088 self._connection.sendall(packet)
00089 else:
00090 logger.warn('[%s] is not connected.', self.__class__.__name__)
00091
00092 def is_connected(self):
00093 """ check connection
00094
00095 :returns: True for connected / False for disconnected.
00096 """
00097 return self._is_connected
00098
00099 def get_state(self):
00100 """ check connector status
00101
00102 :returns: True for running / False for some error occured.
00103 """
00104 return self._state
00105
00106 def set_state(self, state):
00107 """ set connector status
00108
00109 :param state:
00110 """
00111 self._state = state
00112
00113 def set_ext_msg(self, msg):
00114 """ set exception message
00115
00116 :param msg:
00117 """
00118 self._ext_msg = msg
00119
00120 def run(self):
00121 while self._keep_alive:
00122 try:
00123
00124 self.connect()
00125 if self.is_connected():
00126 self._process()
00127
00128 except Exception as err:
00129 logger.warn(str(err))
00130 logger.info('[%s] retry to connect.', self.__class__.__name__)
00131 self.set_state(False)
00132
00133 finally:
00134 self.disconnect()
00135 time.sleep(self.RECONNECT_TIME)
00136 logger.info('[%s] disconnect.', self.__class__.__name__)
00137 self.set_ext_msg(None)
00138
00139 logger.info('[%s] exit success.', self.__class__.__name__)
00140
00141 def end(self):
00142 self._keep_alive = False
00143 self.disconnect()
00144
00145 def _process(self):
00146 pass
00147
00148
00149 class VADConnector(ConnectorBase):
00150 """ VADConnector class
00151 本接続クラスは、音声情報の通信に使用される
00152 """
00153 def __init__(self, host, port, received_cb):
00154 """ init class """
00155 super(VADConnector, self).__init__()
00156 self._host = host
00157 self._port = port
00158 self._received_cb = received_cb
00159 self._received_packet = ''
00160 self._send_packet_que = []
00161 self._send_packet_lock = threading.Lock()
00162
00163 def _create_connection(self):
00164 """ create connection
00165 """
00166 msg = '[{name}] create connection. {host}:{port}'.format(
00167 host=self._host,
00168 port=self._port,
00169 name=self.__class__.__name__
00170 )
00171 logger.info(msg)
00172 connection = socket.socket(
00173 socket.AF_INET,
00174 socket.SOCK_STREAM
00175 )
00176 connection.settimeout(10.0)
00177 connection.connect((self._host, self._port))
00178 return connection
00179
00180 def _process(self):
00181 """ process data
00182 """
00183 while self._keep_alive:
00184
00185 rready, wready, xready = select.select(
00186 [self._connection],
00187 [],
00188 [],
00189 0.01
00190 )
00191
00192 for sock in rready:
00193 recv_packet = sock.recv(framesync.VAD_FRAME_SIZE)
00194 self._process_packet(recv_packet)
00195
00196
00197 send_packet = self._pop_packet()
00198 for packet in send_packet:
00199 super(VADConnector, self).send(packet)
00200
00201
00202 if self._ext_msg:
00203 logger.warn('[%s] %s', self.__class__.__name__, self._ext_msg)
00204 break
00205
00206 def _process_packet(self, packet):
00207 """ proces packet data
00208
00209 :param packet:
00210 """
00211
00212 logger.debug('receive packet [%d] from vad server.', len(packet))
00213 self._received_packet += packet
00214 while len(self._received_packet) >= framesync.VAD_FRAME_SIZE:
00215
00216 slice_packet = self._received_packet[:framesync.VAD_FRAME_SIZE]
00217 self._received_packet = self._received_packet[framesync.VAD_FRAME_SIZE:]
00218
00219
00220 header = struct.unpack('>1I', slice_packet[:framesync.VAD_HEADER_SIZE])[0]
00221 if header == framesync.HEADER_DATA_MARK:
00222 pass
00223 elif header == framesync.HEADER_START_MARK:
00224 logger.debug('receive start mark')
00225 elif header == framesync.HEADER_END_MARK:
00226 logger.debug('receive end mark')
00227 elif header == framesync.HEADER_TOF_MARK:
00228 logger.debug('receive tof mark')
00229 elif header == framesync.HEADER_EOF_MARK:
00230 logger.debug('receive eof mark')
00231 else:
00232 logger.debug('invalid mark %d', header)
00233 self._received_cb(slice_packet)
00234
00235 def send(self, packet):
00236 """ send packet
00237
00238 :param packet:
00239 """
00240
00241 header = struct.unpack('<1I', bytes(packet[0:4]))[0]
00242 logger.debug('receive frame sync header [%d] from browser.', header)
00243 if header in framesync.VALID_HEADER_MARK_LIST:
00244
00245 self._push_packet(packet)
00246
00247 else:
00248 logger.warn('invald frame header [%d]', header)
00249
00250 def _push_packet(self, packet):
00251 with self._send_packet_lock:
00252 self._send_packet_que.append(packet)
00253 logger.debug('push vad packet. len [%d].', len(self._send_packet_que))
00254
00255 def _pop_packet(self):
00256 ret_packet = []
00257 with self._send_packet_lock:
00258 ret_packet = copy.copy(self._send_packet_que)
00259 self._send_packet_que = []
00260 return ret_packet
00261
00262
00263 class EPDConnector(ConnectorBase):
00264 """ EPDConnector class
00265 本サーバクラスは、EPD命令を受信するために使用される
00266 """
00267 def __init__(self, host, port, received_cb):
00268 """ init class """
00269 super(EPDConnector, self).__init__()
00270 self._host = host
00271 self._port = port
00272 self._sock = socket.socket(
00273 socket.AF_INET,
00274 socket.SOCK_STREAM
00275 )
00276 self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
00277 self._sock.bind((self._host, self._port))
00278 self._sock.settimeout(10.0)
00279 self._received_cb = received_cb
00280 self._send_packet_que = []
00281 self._send_packet_lock = threading.Lock()
00282
00283 def _create_connection(self):
00284 """ create connection
00285 """
00286 msg = '[{name}] create connection. {host}:{port}'.format(
00287 host=self._host,
00288 port=self._port,
00289 name=self.__class__.__name__
00290 )
00291 logger.info(msg)
00292 self._sock.listen(10)
00293 connection, addr = self._sock.accept()
00294 return connection
00295
00296 def _process(self):
00297 """ process data
00298 """
00299 while self._keep_alive:
00300
00301 rready, wready, xready = select.select(
00302 [self._connection],
00303 [],
00304 [],
00305 0.1
00306 )
00307
00308 for sock in rready:
00309 recv_packet = sock.recv(1024)
00310 self._received_cb(recv_packet)
00311
00312
00313 send_packet = self._pop_packet()
00314 for packet in send_packet:
00315 super(EPDConnector, self).send(packet)
00316
00317
00318 if self._ext_msg:
00319 logger.warn('[%s] %s', self.__class__.__name__, self._ext_msg)
00320 break
00321
00322 def send(self, packet):
00323 """ send packet
00324
00325 :param packet:
00326 """
00327 self._push_packet(packet)
00328
00329 def _push_packet(self, packet):
00330 with self._send_packet_lock:
00331 self._send_packet_que.append(packet)
00332
00333 def _pop_packet(self):
00334 ret_packet = []
00335 with self._send_packet_lock:
00336 ret_packet = copy.copy(self._send_packet_que)
00337 self._send_packet_que = []
00338 return ret_packet
00339
00340
00341 class SPIConnector(threading.Thread):
00342 """ SPIConnector class
00343 """
00344 def __init__(self, vad_host, vad_port, epd_host, epd_port):
00345 super(SPIConnector, self).__init__()
00346 self._vad_connected_cb = None
00347 self._vad_disconnected_cb = None
00348 self._vad_received_cb = None
00349 self._epd_received_cb = None
00350
00351
00352 self._vad_connector = VADConnector(
00353 host=vad_host,
00354 port=vad_port,
00355 received_cb=self._vad_received
00356 )
00357
00358
00359 self._epd_connector = EPDConnector(
00360 host=epd_host,
00361 port=epd_port,
00362 received_cb=self._epd_received
00363 )
00364
00365 self._is_connected = False
00366 self._keep_alive = True
00367
00368 def run(self):
00369 """ run thread """
00370
00371 self._vad_connector.start()
00372 self._epd_connector.start()
00373
00374 while self._keep_alive:
00375 self._check_connection()
00376 time.sleep(1)
00377
00378 if not self._vad_connector.get_state():
00379 self._epd_connector.set_ext_msg('vad connector error occured')
00380 elif not self._epd_connector.get_state():
00381 self._vad_connector.set_ext_msg('epd connector error occured')
00382 self._vad_connector.set_state(True)
00383 self._epd_connector.set_state(True)
00384
00385 def end(self):
00386
00387 self._vad_connector.end()
00388 self._vad_connector.join(0.1)
00389
00390
00391 self._epd_connector.end()
00392 self._epd_connector.join(0.1)
00393
00394
00395 self._keep_alive = False
00396
00397 def _vad_received(self, packet):
00398 if self._vad_received_cb:
00399 self._vad_received_cb(packet)
00400
00401 def _epd_received(self, packet):
00402 if self._epd_received_cb:
00403 self._epd_received_cb(packet)
00404
00405 def _check_connection(self):
00406 """ check connection
00407
00408 :returns:
00409 """
00410
00411 auido_con = self._vad_connector.is_connected()
00412 epd_con = self._epd_connector.is_connected()
00413 connected = auido_con and epd_con
00414
00415
00416 if connected and not self._is_connected:
00417
00418 if self._vad_connected_cb:
00419 self._vad_connected_cb()
00420 logger.info('vad connection state changed. [Disconnected->Connected]')
00421 self._is_connected = True
00422
00423 elif not connected and self._is_connected:
00424
00425 if self._vad_disconnected_cb:
00426 self._vad_disconnected_cb()
00427 logger.info('vad connection state changed. [Connected->Disconnected]')
00428 self._is_connected = False
00429 return connected
00430
00431 def set_vad_callback(self, vad_connected, vad_disconnected):
00432 """ set vad callback
00433
00434 :param vad_connected:
00435 :param vad_disconnected:
00436 """
00437 self._vad_connected_cb = vad_connected
00438 self._vad_disconnected_cb = vad_disconnected
00439
00440 def reset_vad_callback(self):
00441 """ reset vad callback
00442 """
00443 self._vad_connected_cb = None
00444 self._vad_disconnected_cb = None
00445
00446 def set_recv_callback(self, vad_received, epd_received):
00447 self._vad_received_cb = vad_received
00448 self._epd_received_cb = epd_received
00449
00450 def reset_recv_callback(self):
00451 self._vad_received_cb = None
00452 self._epd_received_cb = None
00453
00454 def send_vad_packet(self, packet):
00455 """ send audio packet
00456
00457 :param packet:
00458 """
00459 self._vad_connector.send(packet)
00460
00461 def send_epd_packet(self, packet):
00462 """ send epd packet
00463
00464 :param packet:
00465 """
00466 self._epd_connector.send(packet)
00467
00468
00469 def main():
00470 logging_util.setup_logging()
00471
00472 vad_host = '127.0.0.1'
00473 vad_port = 16001
00474
00475 epd_host = '127.0.0.1'
00476 epd_port = 5002
00477
00478 connector = SPIConnector(
00479 vad_host=vad_host,
00480 vad_port=vad_port,
00481 epd_host=epd_host,
00482 epd_port=epd_port
00483 )
00484
00485 try:
00486 connector.start()
00487 logger.info('wait for [enter]')
00488 raw_input()
00489
00490 except KeyboardInterrupt:
00491 pass
00492
00493 except Exception as err:
00494 logger.error(str(err))
00495
00496 finally:
00497 logger.info('exit connector')
00498 connector.end()
00499 logger.info('wait end threads')
00500 connector.join(0.1)
00501 logger.info('finish process')
00502
00503
00504 if __name__ == '__main__':
00505 import sys
00506 sys.exit(main())