Go to the documentation of this file.00001
00002
00003 import websocket
00004 import rospy
00005 import unittest
00006 import time
00007
00008 class TestWebsocket(unittest.TestCase):
00009 def setUp(self):
00010 self.ws = websocket.create_connection("ws://localhost:9849/websocket_echo")
00011
00012 def tearDown(self):
00013 self.ws.close()
00014
00015 def test_ok(self):
00016 self.ws.send("hello")
00017 self.assertEqual("hello", self.ws.recv())
00018 self.ws.send("test")
00019 self.assertEqual("test", self.ws.recv())
00020 self.ws.send("hi")
00021 self.assertEqual("hi", self.ws.recv())
00022
00023 self.ws.ping("test ping")
00024 ping_echo = self.ws.recv_frame()
00025 self.assertEqual(9, ping_echo.opcode)
00026 self.assertEqual("test ping", ping_echo.data)
00027
00028 self.ws.pong("test pong")
00029 pong_echo = self.ws.recv_frame()
00030 self.assertEqual(10, pong_echo.opcode)
00031 self.assertEqual("test pong", pong_echo.data)
00032
00033 if __name__ == '__main__':
00034 time.sleep(1)
00035
00036 import rostest
00037 rospy.init_node('websocket_test')
00038 rostest.rosrun('async_web_server_cpp', 'websocket', TestWebsocket)