ws_server_tornado.py
Go to the documentation of this file.
00001 # -*- coding: utf-8 -*-
00002 
00003 # python libraries
00004 import os
00005 import sys
00006 import threading
00007 import struct
00008 import logging
00009 
00010 # pypi libraries
00011 import tornado
00012 import tornado.ioloop
00013 import tornado.web
00014 import tornado.websocket
00015 from tornado.ioloop import PeriodicCallback
00016 from tornado.web import StaticFileHandler
00017 
00018 # ros package
00019 import rospkg
00020 
00021 # local libraries
00022 import logging_util
00023 
00024 # import tornado
00025 version_list = [0, 0, 0]
00026 for idx, value in enumerate(tornado.version.split('.')):
00027     version_list[idx] = int(value)
00028 major, minor, revision = version_list
00029 version = major * 1000**2 + minor * 1000 + revision
00030 
00031 if version < 2002000:
00032     from ws_tornado_210 import ModWebSocketHandler as WebSocketHandler
00033 else:
00034     from tornado.websocket import WebSocketHandler
00035 
00036 
00037 __all__ = [
00038     'WebAudioMonotorServer',
00039 ]
00040 
00041 
00042 # create logger
00043 logger = logging_util.get_logger(__file__)
00044 
00045 
00046 class MainHandler(tornado.web.RequestHandler):
00047     def get(self):
00048         self.render('index.html')
00049 
00050 
00051 class MyWebSocketHandler(WebSocketHandler):
00052     """ tornado websocket handler
00053     """
00054     def initialize(self, factory):
00055         logger.info('initialize handler')
00056         self.session_id = id(self)
00057         self.factory = factory
00058         self.ping_callback = None
00059         self.timeout_callback = None
00060         self.is_open = False
00061         self.ping_callback = PeriodicCallback(self._send_ping, 2000)
00062         self.timeout_callback = PeriodicCallback(self._check_timeout, 1000)
00063         self.timeout_count = 0
00064 
00065     def check_origin(self, origin):
00066         return True
00067 
00068     def open(self):
00069         logger.info('websocket handler opened')
00070         self.is_open = True
00071         self.ping_callback.start()
00072         self.factory.register_session(self)
00073 
00074     def write_message(self, message, binary=False):
00075         if self.is_open and not self.ws_connection.client_terminated:
00076             self.ws_connection.write_message(message, binary)
00077 
00078     def on_message(self, payload):
00079         logger.debug('receive message')
00080         is_binary = isinstance(payload, str)
00081         self.factory.add_recv_packet(payload, is_binary, self.session_id)
00082 
00083     def on_pong(self, data):
00084         logger.debug('on pong [%s]', self.session_id)
00085         self.timeout_callback.stop()
00086         self.timeout_count = 0
00087 
00088     def _send_ping(self):
00089         logger.debug('send ping [%s]', self.session_id)
00090         ping_str = '{}'.format(self.session_id).encode('utf-8')
00091         self.timeout_callback.stop()
00092         self.timeout_callback.start()
00093         self.ping(ping_str)
00094 
00095     def _check_timeout(self):
00096         logger.info('time out occured [%s]', self.session_id)
00097         self.timeout_count += 1
00098         if self.timeout_count > 100:
00099             self.close()
00100             self.timeout_callback.stop()
00101             self.factory.unregister_session(self)
00102 
00103     def on_close(self):
00104         logger.info('websocket handler closed [%s]', self.session_id)
00105         self.ping_callback.stop()
00106         self.timeout_callback.stop()
00107         self.is_open = False
00108         self.factory.unregister_session(self)
00109 
00110     def close(self):
00111         if self.is_open:
00112             self.ping_callback.stop()
00113             self.timeout_callback.stop()
00114             self.is_open = False
00115             super(WebSocketHandler, self).close()
00116 
00117     def on_finish(self):
00118         logger.info('finish connection [%s]', self.session_id)
00119 
00120     def onVadConnected(self):
00121         payload = '[VAD SERVER Connected]'.encode('utf-8')
00122         self.write_message(payload, False)
00123 
00124     def onVadDisconnected(self):
00125         payload = '[VAD SERVER Disconnected]'.encode('utf-8')
00126         self.write_message(payload, False)
00127 
00128     def onEpdMessage(self, text):
00129         payload = '[SS] {text}'.format(text=text).encode('utf-8')
00130         self.write_message(payload, False)
00131 
00132     def onSRReceived(self, text):
00133         payload = u'[SR_TEXT] ' + text.decode('utf-8')
00134         self.write_message(payload.encode('utf-8'), False)
00135 
00136     def onSSReceived(self, text):
00137         payload = u'[SS_TEXT] ' + text.decode('utf-8')
00138         self.write_message(payload.encode('utf-8'), False)
00139 
00140     def sendMessage(self, data, binary):
00141         self.write_message(data, binary)
00142 
00143 
00144 class WebAudioMonitorServerFactory(object):
00145     """ AudioMonitorServerFactory class """
00146     _VAD_HEADER = 22
00147     _SS_TEXT = 30
00148     _SR_TEXT = 31
00149 
00150     def __init__(self, uri, recv_audio_packet_cb, recv_command_packet_cb):
00151         self._recv_audio_packet_cb = recv_audio_packet_cb
00152         self._recv_command_packet_cb = recv_command_packet_cb
00153         self._clients = {}
00154         self._vad_connected = False
00155         self._session_id_list = []
00156         self._mutex = threading.Lock()
00157 
00158     def add_recv_packet(self, packet, is_binary, session_id):
00159         """ add data to data que
00160 
00161         :param packet:
00162         :param is_binary:
00163         :param session_id:
00164         """
00165         if session_id in self._session_id_list:
00166             # check primary session
00167             if session_id == self._session_id_list[0]:
00168                 self._recv_packet(packet, is_binary)
00169         else:
00170             logger.debug('target session [%s] is not found.', session_id)
00171 
00172     def _recv_packet(self, packet, is_binary):
00173         """ reveive packet process
00174 
00175         :param packet:
00176         :param is_binary:
00177         """
00178         if is_binary:
00179             logger.debug('receive binary data. length [%d]', len(packet))
00180             conv_packet = bytearray(packet)
00181             header = conv_packet[0]
00182             if header == self._VAD_HEADER:
00183                 self._recv_audio_packet_cb(conv_packet[1:])
00184             else:
00185                 logger.warn('invalid header packet. header [%d]', header)
00186 
00187         else:
00188             logger.debug('receive string data [%s]', packet)
00189             self._recv_command_packet_cb(packet)
00190 
00191     def register_session(self, session):
00192         """ register new session
00193 
00194         :param session_id:
00195         """
00196         session_id = session.session_id
00197         logger.info('add session [%s]', session_id)
00198 
00199         # add data que for this session
00200         with self._mutex:
00201             self._clients[session_id] = session
00202             self._session_id_list.append(session_id)
00203 
00204         # if vad connected, emmit vad connected event.
00205         if self._vad_connected:
00206             self.notify_vad_connected()
00207 
00208     def unregister_session(self, session):
00209         """ delete session id
00210 
00211         :session_id:
00212         """
00213         with self._mutex:
00214             session_id = session.session_id
00215             logger.info('delete session [%s]', session_id)
00216             self._session_id_list = [id for id in self._session_id_list if id != session_id]
00217             if session_id in self._clients:
00218                 self._clients[session_id].close()
00219                 del self._clients[session_id]
00220 
00221         # if vad connected, emmit vad connected event.
00222         if self._vad_connected:
00223             self.notify_vad_connected()
00224 
00225     def send_packet(self, packet):
00226         """ add send packet by protocol class
00227 
00228         :packet:
00229         """
00230         with self._mutex:
00231             send_packet = struct.pack('B', self._VAD_HEADER)
00232             send_packet += packet
00233             for client in self._clients.values():
00234                 logger.debug('send packet len: %d', len(send_packet))
00235                 client.sendMessage(send_packet, True)
00236 
00237     def notify_vad_connected(self):
00238         """ notify vad connected
00239         """
00240         with self._mutex:
00241             self._vad_connected = True
00242             for key, client in self._clients.items():
00243                 logger.debug('notify to valid client [%s]', key)
00244                 client.onVadConnected()
00245 
00246     def notify_vad_disconnected(self):
00247         """ notify vad disconnected
00248 
00249         """
00250         with self._mutex:
00251             self._vad_connected = False
00252             for client in self._clients.values():
00253                 client.onVadDisconnected()
00254 
00255     def notify_epd_message(self, message):
00256         """ notify epd message
00257         """
00258         if len(message) == 0:
00259             return None
00260 
00261         with self._mutex:
00262             for client in self._clients.values():
00263                 client.onEpdMessage(message)
00264 
00265     def _create_send_text_packet(self, header, text):
00266         utf8_text = text.decode('utf-8')
00267         encoded_text = utf8_text.encode('utf-8')
00268         send_packet = struct.pack('B', header)
00269         send_packet += encoded_text
00270         return send_packet
00271 
00272     def notify_ss_text(self, text):
00273         """ notify speech synthesis result
00274         """
00275         if len(text) == 0:
00276             return None
00277 
00278         send_packet = self._create_send_text_packet(self._SS_TEXT, text)
00279         with self._mutex:
00280             for client in self._clients.values():
00281                 client.sendMessage(send_packet, True)
00282 
00283     def notify_sr_text(self, text):
00284         """ notify speech recognition result
00285         """
00286         if len(text) == 0:
00287             return None
00288 
00289         send_packet = self._create_send_text_packet(self._SR_TEXT, text)
00290         with self._mutex:
00291             for client in self._clients.values():
00292                 client.sendMessage(send_packet, True)
00293 
00294 
00295 class WebAudioMonotorServer(object):
00296     """ AudioPacketServer class """
00297     def __init__(self, host, port, debug):
00298         """ init function """
00299         self._thread = None
00300         self._factory = None
00301         self._instance = None
00302         self._host = host
00303         self._port = port
00304         self._url = 'ws://{host}:{port}'.format(
00305             host=host,
00306             port=port
00307         )
00308         self._recv_auido_packet_cb = None
00309         self._recv_command_packet_cb = None
00310 
00311     def start(self):
00312         """ start websocket server """
00313         logger.info('start websocket server at %s', self._url)
00314         self._factory = WebAudioMonitorServerFactory(
00315             self._url,
00316             self._recv_auido_packet,
00317             self._recv_command_packet
00318         )
00319 
00320         # both under one Twisted Web Site
00321         rp = rospkg.RosPack()
00322         audiomonitor_path = rp.get_path('rospeex_webaudiomonitor')
00323         path = os.path.join(audiomonitor_path, 'www')
00324         static_path = os.path.join(path, 'static')
00325 
00326         settings = {
00327             'static_path': static_path,
00328             'template_path': path
00329         }
00330         handlers = [
00331             (r'/', MainHandler),
00332             (r'/static/(.*)', StaticFileHandler, {'path': static_path}),
00333             (r'/websocket', MyWebSocketHandler, dict(factory=self._factory))
00334         ]
00335         app = tornado.web.Application(handlers, **settings)
00336         app.listen(self._port)
00337 
00338         self._instance = tornado.ioloop.IOLoop.instance()
00339         self._thread = threading.Thread(
00340             target=self._instance.start
00341         )
00342         self._thread.start()
00343 
00344     def stop(self):
00345         """ stop websocket server
00346         """
00347         self._instance.stop()
00348         self._thread.join(0.1)
00349 
00350     def _recv_auido_packet(self, packet):
00351         """ recv packet
00352         """
00353         if self._recv_auido_packet_cb:
00354             self._recv_auido_packet_cb(packet)
00355 
00356     def _recv_command_packet(self, packet):
00357         """ recv audio packet
00358         """
00359         if self._recv_command_packet_cb:
00360             self._recv_command_packet_cb(packet)
00361 
00362     def set_websocket_callback(self, recv_audio_cb, recv_command_cb):
00363         """ set websocket serer callback
00364 
00365         :param recv_packet_cb:
00366         """
00367         self._recv_auido_packet_cb = recv_audio_cb
00368         self._recv_command_packet_cb = recv_command_cb
00369 
00370     def send_packet(self, packet):
00371         """ notify vad connected
00372         """
00373         self._factory.send_packet(packet)
00374 
00375     def notify_vad_connected(self):
00376         """ notify vad connected
00377         """
00378         self._factory.notify_vad_connected()
00379 
00380     def notify_vad_disconnected(self):
00381         """ notify vad disconnected
00382         """
00383         self._factory.notify_vad_disconnected()
00384 
00385     def notify_epd_message(self, message):
00386         """ notify vad disconnected
00387         """
00388         self._factory.notify_epd_message(message)
00389 
00390     def notify_sr_text(self, text):
00391         """ notify speech recognition text
00392 
00393         :param text:
00394         """
00395         self._factory.notify_sr_text(text)
00396 
00397     def notify_ss_text(self, text):
00398         """ notify speech synthesis text
00399 
00400         :param text:
00401         """
00402         self._factory.notify_ss_text(text)
00403 
00404 
00405 def main():
00406     # setup loggger
00407     logging_util.setup_logging(default_level=logging.DEBUG)
00408     server = WebAudioMonotorServer(
00409         host='127.0.0.1',
00410         port=9000,
00411         debug=True
00412     )
00413     server.start()
00414 
00415     logger.info('wait for enter')
00416     raw_input()
00417 
00418 
00419 if __name__ == '__main__':
00420     sys.exit(main())


rospeex_webaudiomonitor
Author(s): Komei Sugiura
autogenerated on Thu Apr 20 2017 03:09:01