11 from std_msgs.msg
import String
13 from twisted.internet
import reactor
14 from autobahn.twisted.websocket
import (WebSocketClientProtocol,
15 WebSocketClientFactory)
19 from twisted.python
import log
20 log.startLogging(sys.stderr)
30 test_client_received = []
31 class TestClientProtocol(WebSocketClientProtocol):
33 self.sendMessage(json.dumps({
36 'compression':
'cbor-raw',
39 def onMessage(self, payload, binary):
40 test_client_received.append(payload)
42 protocol = os.environ.get(
'PROTOCOL')
43 port = rospy.get_param(
'/rosbridge_websocket/actual_port')
44 url = protocol +
'://127.0.0.1:' + str(port)
46 factory = WebSocketClientFactory(url)
47 factory.protocol = TestClientProtocol
48 reactor.connectTCP(
'127.0.0.1', port, factory)
50 pub = rospy.Publisher(TOPIC, String, queue_size=1)
52 rospy.sleep(WARMUP_DELAY)
53 pub.publish(String(STRING))
54 rospy.sleep(TIME_LIMIT)
56 reactor.callInThread(publish_timer)
59 self.assertEqual(len(test_client_received), 1)
60 websocket_message = decode_cbor(test_client_received[0])
61 self.assertEqual(websocket_message[
'topic'], TOPIC)
63 String(STRING).serialize(buff)
64 self.assertEqual(websocket_message[
'msg'][
'bytes'], buff.getvalue())
67 PKG =
'rosbridge_server' 68 NAME =
'test_websocket_cbor_raw' 70 if __name__ ==
'__main__':
73 while not rospy.is_shutdown()
and not rospy.has_param(
'/rosbridge_websocket/actual_port'):
76 rostest.rosrun(PKG, NAME, TestWebsocketCborRaw)