spi_connector.py
Go to the documentation of this file.
00001 # -*- coding: utf-8 -*-
00002 
00003 # python libraries
00004 import time
00005 import socket
00006 import struct
00007 import threading
00008 import copy
00009 import select
00010 
00011 # pypi libraries
00012 
00013 # local libraries
00014 import logging_util
00015 import framesync
00016 
00017 
00018 __all__ = [
00019     'SPIConnector'
00020 ]
00021 
00022 
00023 logger = logging_util.get_logger(__file__)
00024 
00025 
00026 class ConnectorBase(threading.Thread):
00027     """ ConnectorBase class
00028     """
00029     RECONNECT_TIME = 2.0
00030 
00031     def __init__(self):
00032         """ init class """
00033         super(ConnectorBase, self).__init__()
00034         self._recv_callback = None
00035         self._connection = None
00036         self._is_connected = False
00037         self._keep_alive = True
00038         self._state = True
00039         self._ext_msg = None
00040 
00041     def _create_connection(self):
00042         """ create connection
00043         """
00044         pass
00045 
00046     def connect(self):
00047         """ connect to audio server
00048         """
00049         if self._connection:
00050             self.disconnect()
00051 
00052         try:
00053             self._connection = self._create_connection()
00054             self._is_connected = True
00055             logger.info('[%s] connected.', self.__class__.__name__)
00056 
00057         except Exception as err:
00058             logger.warn('[%s] %s', self.__class__.__name__, err)
00059             self.disconnect()
00060             time.sleep(self.RECONNECT_TIME)
00061 
00062     def disconnect(self):
00063         """ disconnect from server / client """
00064         if self._is_connected:
00065             if self._connection is not None:
00066                 self._connection.close()
00067                 self._connection = None
00068             self._is_connected = False
00069 
00070     def set_recv_callback(self, recv_cb):
00071         """ set receive callback
00072 
00073         :param recv_cb:
00074         """
00075         self._recv_callback = recv_cb
00076 
00077     def reset_recv_callback(self):
00078         """ reset receive callback
00079         """
00080         self._recv_callback = None
00081 
00082     def send(self, packet):
00083         """ send packet
00084 
00085         :param packet:
00086         """
00087         if self.is_connected():
00088             self._connection.sendall(packet)
00089         else:
00090             logger.warn('[%s] is not connected.', self.__class__.__name__)
00091 
00092     def is_connected(self):
00093         """ check connection
00094 
00095         :returns: True for connected / False for disconnected.
00096         """
00097         return self._is_connected
00098 
00099     def get_state(self):
00100         """ check connector status
00101 
00102         :returns: True for running / False for some error occured.
00103         """
00104         return self._state
00105 
00106     def set_state(self, state):
00107         """ set connector status
00108 
00109         :param state:
00110         """
00111         self._state = state
00112 
00113     def set_ext_msg(self, msg):
00114         """ set exception message
00115 
00116         :param msg:
00117         """
00118         self._ext_msg = msg
00119 
00120     def run(self):
00121         while self._keep_alive:
00122             try:
00123                 # process
00124                 self.connect()
00125                 if self.is_connected():
00126                     self._process()
00127 
00128             except Exception as err:
00129                 logger.warn(str(err))
00130                 logger.info('[%s] retry to connect.', self.__class__.__name__)
00131                 self.set_state(False)
00132 
00133             finally:
00134                 self.disconnect()
00135                 time.sleep(self.RECONNECT_TIME)
00136                 logger.info('[%s] disconnect.', self.__class__.__name__)
00137                 self.set_ext_msg(None)
00138 
00139         logger.info('[%s] exit success.', self.__class__.__name__)
00140 
00141     def end(self):
00142         self._keep_alive = False
00143         self.disconnect()
00144 
00145     def _process(self):
00146         pass
00147 
00148 
00149 class VADConnector(ConnectorBase):
00150     """ VADConnector class
00151         本接続クラスは、音声情報の通信に使用される
00152     """
00153     def __init__(self, host, port, received_cb):
00154         """ init class """
00155         super(VADConnector, self).__init__()
00156         self._host = host
00157         self._port = port
00158         self._received_cb = received_cb
00159         self._received_packet = ''
00160         self._send_packet_que = []
00161         self._send_packet_lock = threading.Lock()
00162 
00163     def _create_connection(self):
00164         """ create connection
00165         """
00166         msg = '[{name}] create connection. {host}:{port}'.format(
00167             host=self._host,
00168             port=self._port,
00169             name=self.__class__.__name__
00170         )
00171         logger.info(msg)
00172         connection = socket.socket(
00173             socket.AF_INET,
00174             socket.SOCK_STREAM
00175         )
00176         connection.settimeout(10.0)
00177         connection.connect((self._host, self._port))
00178         return connection
00179 
00180     def _process(self):
00181         """ process data
00182         """
00183         while self._keep_alive:
00184             # read from socket
00185             rready, wready, xready = select.select(
00186                 [self._connection],
00187                 [],
00188                 [],
00189                 0.01
00190             )
00191 
00192             for sock in rready:
00193                 recv_packet = sock.recv(framesync.VAD_FRAME_SIZE)
00194                 self._process_packet(recv_packet)
00195 
00196             # write socket
00197             send_packet = self._pop_packet()
00198             for packet in send_packet:
00199                 super(VADConnector, self).send(packet)
00200 
00201             # if error occured outside
00202             if self._ext_msg:
00203                 logger.warn('[%s] %s', self.__class__.__name__, self._ext_msg)
00204                 break
00205 
00206     def _process_packet(self, packet):
00207         """ proces packet data
00208 
00209         :param packet:
00210         """
00211         # TODO test
00212         logger.debug('receive packet [%d] from vad server.', len(packet))
00213         self._received_packet += packet
00214         while len(self._received_packet) >= framesync.VAD_FRAME_SIZE:
00215             # get new packet
00216             slice_packet = self._received_packet[:framesync.VAD_FRAME_SIZE]
00217             self._received_packet = self._received_packet[framesync.VAD_FRAME_SIZE:]
00218 
00219             # get header data
00220             header = struct.unpack('>1I', slice_packet[:framesync.VAD_HEADER_SIZE])[0]
00221             if header == framesync.HEADER_DATA_MARK:
00222                 pass
00223             elif header == framesync.HEADER_START_MARK:
00224                 logger.debug('receive start mark')
00225             elif header == framesync.HEADER_END_MARK:
00226                 logger.debug('receive end mark')
00227             elif header == framesync.HEADER_TOF_MARK:
00228                 logger.debug('receive tof mark')
00229             elif header == framesync.HEADER_EOF_MARK:
00230                 logger.debug('receive eof mark')
00231             else:
00232                 logger.debug('invalid mark %d', header)
00233             self._received_cb(slice_packet)
00234 
00235     def send(self, packet):
00236         """ send packet
00237 
00238         :param packet:
00239         """
00240         # なぜ、ブラウザから送られてくるヘッダ情報がリトルエンディアンになっているのか?
00241         header = struct.unpack('<1I', bytes(packet[0:4]))[0]
00242         logger.debug('receive frame sync header [%d] from browser.', header)
00243         if header in framesync.VALID_HEADER_MARK_LIST:
00244         # if header in [framesync.HEADER_TOF_MARK, framesync.HEADER_DATA_MARK]:
00245             self._push_packet(packet)
00246             # super(VADConnector, self).send(packet)
00247         else:
00248             logger.warn('invald frame header [%d]', header)
00249 
00250     def _push_packet(self, packet):
00251         with self._send_packet_lock:
00252             self._send_packet_que.append(packet)
00253             logger.debug('push vad packet. len [%d].', len(self._send_packet_que))
00254 
00255     def _pop_packet(self):
00256         ret_packet = []
00257         with self._send_packet_lock:
00258             ret_packet = copy.copy(self._send_packet_que)
00259             self._send_packet_que = []
00260         return ret_packet
00261 
00262 
00263 class EPDConnector(ConnectorBase):
00264     """ EPDConnector class
00265         本サーバクラスは、EPD命令を受信するために使用される
00266     """
00267     def __init__(self, host, port, received_cb):
00268         """ init class """
00269         super(EPDConnector, self).__init__()
00270         self._host = host
00271         self._port = port
00272         self._sock = socket.socket(
00273             socket.AF_INET,
00274             socket.SOCK_STREAM
00275         )
00276         self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
00277         self._sock.bind((self._host, self._port))
00278         self._sock.settimeout(10.0)
00279         self._received_cb = received_cb
00280         self._send_packet_que = []
00281         self._send_packet_lock = threading.Lock()
00282 
00283     def _create_connection(self):
00284         """ create connection
00285         """
00286         msg = '[{name}] create connection. {host}:{port}'.format(
00287             host=self._host,
00288             port=self._port,
00289             name=self.__class__.__name__
00290         )
00291         logger.info(msg)
00292         self._sock.listen(10)
00293         connection, addr = self._sock.accept()
00294         return connection
00295 
00296     def _process(self):
00297         """ process data
00298         """
00299         while self._keep_alive:
00300             # read from socket
00301             rready, wready, xready = select.select(
00302                 [self._connection],
00303                 [],
00304                 [],
00305                 0.1
00306             )
00307 
00308             for sock in rready:
00309                 recv_packet = sock.recv(1024)
00310                 self._received_cb(recv_packet)
00311 
00312             # write socket
00313             send_packet = self._pop_packet()
00314             for packet in send_packet:
00315                 super(EPDConnector, self).send(packet)
00316 
00317             # if error occured outside
00318             if self._ext_msg:
00319                 logger.warn('[%s] %s', self.__class__.__name__, self._ext_msg)
00320                 break
00321 
00322     def send(self, packet):
00323         """ send packet
00324 
00325         :param packet:
00326         """
00327         self._push_packet(packet)
00328 
00329     def _push_packet(self, packet):
00330         with self._send_packet_lock:
00331             self._send_packet_que.append(packet)
00332 
00333     def _pop_packet(self):
00334         ret_packet = []
00335         with self._send_packet_lock:
00336             ret_packet = copy.copy(self._send_packet_que)
00337             self._send_packet_que = []
00338         return ret_packet
00339 
00340 
00341 class SPIConnector(threading.Thread):
00342     """ SPIConnector class
00343     """
00344     def __init__(self, vad_host, vad_port, epd_host, epd_port):
00345         super(SPIConnector, self).__init__()
00346         self._vad_connected_cb = None
00347         self._vad_disconnected_cb = None
00348         self._vad_received_cb = None
00349         self._epd_received_cb = None
00350 
00351         # init audio connector
00352         self._vad_connector = VADConnector(
00353             host=vad_host,
00354             port=vad_port,
00355             received_cb=self._vad_received
00356         )
00357 
00358         # init epd connector
00359         self._epd_connector = EPDConnector(
00360             host=epd_host,
00361             port=epd_port,
00362             received_cb=self._epd_received
00363         )
00364 
00365         self._is_connected = False
00366         self._keep_alive = True
00367 
00368     def run(self):
00369         """ run thread """
00370         # start client / server
00371         self._vad_connector.start()
00372         self._epd_connector.start()
00373 
00374         while self._keep_alive:
00375             self._check_connection()
00376             time.sleep(1)
00377 
00378             if not self._vad_connector.get_state():
00379                 self._epd_connector.set_ext_msg('vad connector error occured')
00380             elif not self._epd_connector.get_state():
00381                 self._vad_connector.set_ext_msg('epd connector error occured')
00382             self._vad_connector.set_state(True)
00383             self._epd_connector.set_state(True)
00384 
00385     def end(self):
00386         # end audio connector
00387         self._vad_connector.end()
00388         self._vad_connector.join(0.1)
00389 
00390         # end epd connector
00391         self._epd_connector.end()
00392         self._epd_connector.join(0.1)
00393 
00394         # end spi connector
00395         self._keep_alive = False
00396 
00397     def _vad_received(self, packet):
00398         if self._vad_received_cb:
00399             self._vad_received_cb(packet)
00400 
00401     def _epd_received(self, packet):
00402         if self._epd_received_cb:
00403             self._epd_received_cb(packet)
00404 
00405     def _check_connection(self):
00406         """ check connection
00407 
00408         :returns:
00409         """
00410         # get connection state
00411         auido_con = self._vad_connector.is_connected()
00412         epd_con = self._epd_connector.is_connected()
00413         connected = auido_con and epd_con
00414 
00415         # check state changed
00416         if connected and not self._is_connected:
00417             # state [disconnected -> connected]
00418             if self._vad_connected_cb:
00419                 self._vad_connected_cb()
00420             logger.info('vad connection state changed. [Disconnected->Connected]')
00421             self._is_connected = True
00422 
00423         elif not connected and self._is_connected:
00424             # state [connected -> disconnected]
00425             if self._vad_disconnected_cb:
00426                 self._vad_disconnected_cb()
00427             logger.info('vad connection state changed. [Connected->Disconnected]')
00428             self._is_connected = False
00429         return connected
00430 
00431     def set_vad_callback(self, vad_connected, vad_disconnected):
00432         """ set vad callback
00433 
00434         :param vad_connected:
00435         :param vad_disconnected:
00436         """
00437         self._vad_connected_cb = vad_connected
00438         self._vad_disconnected_cb = vad_disconnected
00439 
00440     def reset_vad_callback(self):
00441         """ reset vad callback
00442         """
00443         self._vad_connected_cb = None
00444         self._vad_disconnected_cb = None
00445 
00446     def set_recv_callback(self, vad_received, epd_received):
00447         self._vad_received_cb = vad_received
00448         self._epd_received_cb = epd_received
00449 
00450     def reset_recv_callback(self):
00451         self._vad_received_cb = None
00452         self._epd_received_cb = None
00453 
00454     def send_vad_packet(self, packet):
00455         """ send audio packet
00456 
00457         :param packet:
00458         """
00459         self._vad_connector.send(packet)
00460 
00461     def send_epd_packet(self, packet):
00462         """ send epd packet
00463 
00464         :param packet:
00465         """
00466         self._epd_connector.send(packet)
00467 
00468 
00469 def main():
00470     logging_util.setup_logging()
00471 
00472     vad_host = '127.0.0.1'
00473     vad_port = 16001
00474 
00475     epd_host = '127.0.0.1'
00476     epd_port = 5002
00477 
00478     connector = SPIConnector(
00479         vad_host=vad_host,
00480         vad_port=vad_port,
00481         epd_host=epd_host,
00482         epd_port=epd_port
00483     )
00484 
00485     try:
00486         connector.start()
00487         logger.info('wait for [enter]')
00488         raw_input()
00489 
00490     except KeyboardInterrupt:
00491         pass
00492 
00493     except Exception as err:
00494         logger.error(str(err))
00495 
00496     finally:
00497         logger.info('exit connector')
00498         connector.end()
00499         logger.info('wait end threads')
00500         connector.join(0.1)
00501         logger.info('finish process')
00502 
00503 
00504 if __name__ == '__main__':
00505     import sys
00506     sys.exit(main())


rospeex_webaudiomonitor
Author(s): Komei Sugiura
autogenerated on Thu Jun 6 2019 18:53:16