00001
00002
00003
00004 import os
00005 import sys
00006 import threading
00007 import struct
00008 import logging
00009
00010
00011 import tornado
00012 import tornado.ioloop
00013 import tornado.web
00014 import tornado.websocket
00015 from tornado.ioloop import PeriodicCallback
00016 from tornado.web import StaticFileHandler
00017
00018
00019 import rospkg
00020
00021
00022 import logging_util
00023
00024
00025 version_list = [0, 0, 0]
00026 for idx, value in enumerate(tornado.version.split('.')):
00027 version_list[idx] = int(value)
00028 major, minor, revision = version_list
00029 version = major * 1000**2 + minor * 1000 + revision
00030
00031 if version < 2002000:
00032 from ws_tornado_210 import ModWebSocketHandler as WebSocketHandler
00033 else:
00034 from tornado.websocket import WebSocketHandler
00035
00036
00037 __all__ = [
00038 'WebAudioMonotorServer',
00039 ]
00040
00041
00042
00043 logger = logging_util.get_logger(__file__)
00044
00045
00046 class MainHandler(tornado.web.RequestHandler):
00047 def get(self):
00048 self.render('index.html')
00049
00050
00051 class MyWebSocketHandler(WebSocketHandler):
00052 """ tornado websocket handler
00053 """
00054 def initialize(self, factory):
00055 logger.info('initialize handler')
00056 self.session_id = id(self)
00057 self.factory = factory
00058 self.ping_callback = None
00059 self.timeout_callback = None
00060 self.is_open = False
00061 self.ping_callback = PeriodicCallback(self._send_ping, 2000)
00062 self.timeout_callback = PeriodicCallback(self._check_timeout, 1000)
00063 self.timeout_count = 0
00064
00065 def check_origin(self, origin):
00066 return True
00067
00068 def open(self):
00069 logger.info('websocket handler opened')
00070 self.is_open = True
00071 self.ping_callback.start()
00072 self.factory.register_session(self)
00073
00074 def write_message(self, message, binary=False):
00075 if self.is_open and not self.ws_connection.client_terminated:
00076 self.ws_connection.write_message(message, binary)
00077
00078 def on_message(self, payload):
00079 logger.debug('receive message')
00080 is_binary = isinstance(payload, str)
00081 self.factory.add_recv_packet(payload, is_binary, self.session_id)
00082
00083 def on_pong(self, data):
00084 logger.debug('on pong [%s]', self.session_id)
00085 self.timeout_callback.stop()
00086 self.timeout_count = 0
00087
00088 def _send_ping(self):
00089 logger.debug('send ping [%s]', self.session_id)
00090 ping_str = '{}'.format(self.session_id).encode('utf-8')
00091 self.timeout_callback.stop()
00092 self.timeout_callback.start()
00093 self.ping(ping_str)
00094
00095 def _check_timeout(self):
00096 logger.info('time out occured [%s]', self.session_id)
00097 self.timeout_count += 1
00098 if self.timeout_count > 100:
00099 self.close()
00100 self.timeout_callback.stop()
00101 self.factory.unregister_session(self)
00102
00103 def on_close(self):
00104 logger.info('websocket handler closed [%s]', self.session_id)
00105 self.ping_callback.stop()
00106 self.timeout_callback.stop()
00107 self.is_open = False
00108 self.factory.unregister_session(self)
00109
00110 def close(self):
00111 if self.is_open:
00112 self.ping_callback.stop()
00113 self.timeout_callback.stop()
00114 self.is_open = False
00115 super(WebSocketHandler, self).close()
00116
00117 def on_finish(self):
00118 logger.info('finish connection [%s]', self.session_id)
00119
00120 def onVadConnected(self):
00121 payload = '[VAD SERVER Connected]'.encode('utf-8')
00122 self.write_message(payload, False)
00123
00124 def onVadDisconnected(self):
00125 payload = '[VAD SERVER Disconnected]'.encode('utf-8')
00126 self.write_message(payload, False)
00127
00128 def onEpdMessage(self, text):
00129 payload = '[SS] {text}'.format(text=text).encode('utf-8')
00130 self.write_message(payload, False)
00131
00132 def onSRReceived(self, text):
00133 payload = u'[SR_TEXT] ' + text.decode('utf-8')
00134 self.write_message(payload.encode('utf-8'), False)
00135
00136 def onSSReceived(self, text):
00137 payload = u'[SS_TEXT] ' + text.decode('utf-8')
00138 self.write_message(payload.encode('utf-8'), False)
00139
00140 def sendMessage(self, data, binary):
00141 self.write_message(data, binary)
00142
00143
00144 class WebAudioMonitorServerFactory(object):
00145 """ AudioMonitorServerFactory class """
00146 _VAD_HEADER = 22
00147 _SS_TEXT = 30
00148 _SR_TEXT = 31
00149
00150 def __init__(self, uri, recv_audio_packet_cb, recv_command_packet_cb):
00151 self._recv_audio_packet_cb = recv_audio_packet_cb
00152 self._recv_command_packet_cb = recv_command_packet_cb
00153 self._clients = {}
00154 self._vad_connected = False
00155 self._session_id_list = []
00156 self._mutex = threading.Lock()
00157
00158 def add_recv_packet(self, packet, is_binary, session_id):
00159 """ add data to data que
00160
00161 :param packet:
00162 :param is_binary:
00163 :param session_id:
00164 """
00165 if session_id in self._session_id_list:
00166
00167 if session_id == self._session_id_list[0]:
00168 self._recv_packet(packet, is_binary)
00169 else:
00170 logger.debug('target session [%s] is not found.', session_id)
00171
00172 def _recv_packet(self, packet, is_binary):
00173 """ reveive packet process
00174
00175 :param packet:
00176 :param is_binary:
00177 """
00178 if is_binary:
00179 logger.debug('receive binary data. length [%d]', len(packet))
00180 conv_packet = bytearray(packet)
00181 header = conv_packet[0]
00182 if header == self._VAD_HEADER:
00183 self._recv_audio_packet_cb(conv_packet[1:])
00184 else:
00185 logger.warn('invalid header packet. header [%d]', header)
00186
00187 else:
00188 logger.debug('receive string data [%s]', packet)
00189 self._recv_command_packet_cb(packet)
00190
00191 def register_session(self, session):
00192 """ register new session
00193
00194 :param session_id:
00195 """
00196 session_id = session.session_id
00197 logger.info('add session [%s]', session_id)
00198
00199
00200 with self._mutex:
00201 self._clients[session_id] = session
00202 self._session_id_list.append(session_id)
00203
00204
00205 if self._vad_connected:
00206 self.notify_vad_connected()
00207
00208 def unregister_session(self, session):
00209 """ delete session id
00210
00211 :session_id:
00212 """
00213 with self._mutex:
00214 session_id = session.session_id
00215 logger.info('delete session [%s]', session_id)
00216 self._session_id_list = [id for id in self._session_id_list if id != session_id]
00217 if session_id in self._clients:
00218 self._clients[session_id].close()
00219 del self._clients[session_id]
00220
00221
00222 if self._vad_connected:
00223 self.notify_vad_connected()
00224
00225 def send_packet(self, packet):
00226 """ add send packet by protocol class
00227
00228 :packet:
00229 """
00230 with self._mutex:
00231 send_packet = struct.pack('B', self._VAD_HEADER)
00232 send_packet += packet
00233 for client in self._clients.values():
00234 logger.debug('send packet len: %d', len(send_packet))
00235 client.sendMessage(send_packet, True)
00236
00237 def notify_vad_connected(self):
00238 """ notify vad connected
00239 """
00240 with self._mutex:
00241 self._vad_connected = True
00242 for key, client in self._clients.items():
00243 logger.debug('notify to valid client [%s]', key)
00244 client.onVadConnected()
00245
00246 def notify_vad_disconnected(self):
00247 """ notify vad disconnected
00248
00249 """
00250 with self._mutex:
00251 self._vad_connected = False
00252 for client in self._clients.values():
00253 client.onVadDisconnected()
00254
00255 def notify_epd_message(self, message):
00256 """ notify epd message
00257 """
00258 if len(message) == 0:
00259 return None
00260
00261 with self._mutex:
00262 for client in self._clients.values():
00263 client.onEpdMessage(message)
00264
00265 def _create_send_text_packet(self, header, text):
00266 utf8_text = text.decode('utf-8')
00267 encoded_text = utf8_text.encode('utf-8')
00268 send_packet = struct.pack('B', header)
00269 send_packet += encoded_text
00270 return send_packet
00271
00272 def notify_ss_text(self, text):
00273 """ notify speech synthesis result
00274 """
00275 if len(text) == 0:
00276 return None
00277
00278 send_packet = self._create_send_text_packet(self._SS_TEXT, text)
00279 with self._mutex:
00280 for client in self._clients.values():
00281 client.sendMessage(send_packet, True)
00282
00283 def notify_sr_text(self, text):
00284 """ notify speech recognition result
00285 """
00286 if len(text) == 0:
00287 return None
00288
00289 send_packet = self._create_send_text_packet(self._SR_TEXT, text)
00290 with self._mutex:
00291 for client in self._clients.values():
00292 client.sendMessage(send_packet, True)
00293
00294
00295 class WebAudioMonotorServer(object):
00296 """ AudioPacketServer class """
00297 def __init__(self, host, port, debug):
00298 """ init function """
00299 self._thread = None
00300 self._factory = None
00301 self._instance = None
00302 self._host = host
00303 self._port = port
00304 self._url = 'ws://{host}:{port}'.format(
00305 host=host,
00306 port=port
00307 )
00308 self._recv_auido_packet_cb = None
00309 self._recv_command_packet_cb = None
00310
00311 def start(self):
00312 """ start websocket server """
00313 logger.info('start websocket server at %s', self._url)
00314 self._factory = WebAudioMonitorServerFactory(
00315 self._url,
00316 self._recv_auido_packet,
00317 self._recv_command_packet
00318 )
00319
00320
00321 rp = rospkg.RosPack()
00322 audiomonitor_path = rp.get_path('rospeex_webaudiomonitor')
00323 path = os.path.join(audiomonitor_path, 'www')
00324 static_path = os.path.join(path, 'static')
00325
00326 settings = {
00327 'static_path': static_path,
00328 'template_path': path
00329 }
00330 handlers = [
00331 (r'/', MainHandler),
00332 (r'/static/(.*)', StaticFileHandler, {'path': static_path}),
00333 (r'/websocket', MyWebSocketHandler, dict(factory=self._factory))
00334 ]
00335 app = tornado.web.Application(handlers, **settings)
00336 app.listen(self._port)
00337
00338 self._instance = tornado.ioloop.IOLoop.instance()
00339 self._thread = threading.Thread(
00340 target=self._instance.start
00341 )
00342 self._thread.start()
00343
00344 def stop(self):
00345 """ stop websocket server
00346 """
00347 self._instance.stop()
00348 self._thread.join(0.1)
00349
00350 def _recv_auido_packet(self, packet):
00351 """ recv packet
00352 """
00353 if self._recv_auido_packet_cb:
00354 self._recv_auido_packet_cb(packet)
00355
00356 def _recv_command_packet(self, packet):
00357 """ recv audio packet
00358 """
00359 if self._recv_command_packet_cb:
00360 self._recv_command_packet_cb(packet)
00361
00362 def set_websocket_callback(self, recv_audio_cb, recv_command_cb):
00363 """ set websocket serer callback
00364
00365 :param recv_packet_cb:
00366 """
00367 self._recv_auido_packet_cb = recv_audio_cb
00368 self._recv_command_packet_cb = recv_command_cb
00369
00370 def send_packet(self, packet):
00371 """ notify vad connected
00372 """
00373 self._factory.send_packet(packet)
00374
00375 def notify_vad_connected(self):
00376 """ notify vad connected
00377 """
00378 self._factory.notify_vad_connected()
00379
00380 def notify_vad_disconnected(self):
00381 """ notify vad disconnected
00382 """
00383 self._factory.notify_vad_disconnected()
00384
00385 def notify_epd_message(self, message):
00386 """ notify vad disconnected
00387 """
00388 self._factory.notify_epd_message(message)
00389
00390 def notify_sr_text(self, text):
00391 """ notify speech recognition text
00392
00393 :param text:
00394 """
00395 self._factory.notify_sr_text(text)
00396
00397 def notify_ss_text(self, text):
00398 """ notify speech synthesis text
00399
00400 :param text:
00401 """
00402 self._factory.notify_ss_text(text)
00403
00404
00405 def main():
00406
00407 logging_util.setup_logging(default_level=logging.DEBUG)
00408 server = WebAudioMonotorServer(
00409 host='127.0.0.1',
00410 port=9000,
00411 debug=True
00412 )
00413 server.start()
00414
00415 logger.info('wait for enter')
00416 raw_input()
00417
00418
00419 if __name__ == '__main__':
00420 sys.exit(main())