test_cbor_raw.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import json
3 import io
4 import os
5 import rostest
6 import sys
7 import threading
8 import unittest
9 
10 import rospy
11 from std_msgs.msg import String
12 
13 from twisted.internet import reactor
14 from autobahn.twisted.websocket import (WebSocketClientProtocol,
15  WebSocketClientFactory)
16 
17 from rosbridge_library.util.cbor import loads as decode_cbor
18 
19 from twisted.python import log
20 log.startLogging(sys.stderr)
21 
22 TOPIC = '/b_topic'
23 STRING = 'B' * 10000
24 WARMUP_DELAY = 1.0 # seconds
25 TIME_LIMIT = 5.0 # seconds
26 
27 
28 class TestWebsocketCborRaw(unittest.TestCase):
29  def test_cbor_raw(self):
30  test_client_received = []
31  class TestClientProtocol(WebSocketClientProtocol):
32  def onOpen(self):
33  self.sendMessage(json.dumps({
34  'op': 'subscribe',
35  'topic': TOPIC,
36  'compression': 'cbor-raw',
37  }))
38 
39  def onMessage(self, payload, binary):
40  test_client_received.append(payload)
41 
42  protocol = os.environ.get('PROTOCOL')
43  port = rospy.get_param('/rosbridge_websocket/actual_port')
44  url = protocol + '://127.0.0.1:' + str(port)
45 
46  factory = WebSocketClientFactory(url)
47  factory.protocol = TestClientProtocol
48  reactor.connectTCP('127.0.0.1', port, factory)
49 
50  pub = rospy.Publisher(TOPIC, String, queue_size=1)
51  def publish_timer():
52  rospy.sleep(WARMUP_DELAY)
53  pub.publish(String(STRING))
54  rospy.sleep(TIME_LIMIT)
55  reactor.stop()
56  reactor.callInThread(publish_timer)
57  reactor.run()
58 
59  self.assertEqual(len(test_client_received), 1)
60  websocket_message = decode_cbor(test_client_received[0])
61  self.assertEqual(websocket_message['topic'], TOPIC)
62  buff = io.BytesIO()
63  String(STRING).serialize(buff)
64  self.assertEqual(websocket_message['msg']['bytes'], buff.getvalue())
65 
66 
67 PKG = 'rosbridge_server'
68 NAME = 'test_websocket_cbor_raw'
69 
70 if __name__ == '__main__':
71  rospy.init_node(NAME)
72 
73  while not rospy.is_shutdown() and not rospy.has_param('/rosbridge_websocket/actual_port'):
74  rospy.sleep(1.0)
75 
76  rostest.rosrun(PKG, NAME, TestWebsocketCborRaw)


rosbridge_server
Author(s): Jonathan Mace
autogenerated on Wed Jun 3 2020 03:55:18