10 from std_msgs.msg
import String
12 from twisted.internet
import reactor
13 from autobahn.twisted.websocket
import (WebSocketClientProtocol,
14 WebSocketClientFactory)
16 from twisted.python
import log
17 log.startLogging(sys.stderr)
25 A_STRING =
'A' * MSG_SIZE
26 B_STRING =
'B' * MSG_SIZE
36 'type':
'std_msgs/String',
42 'type':
'std_msgs/String',
53 msg = json.dumps(msg_dict).encode(
'utf-8')
54 for _
in range(times):
58 self.__class__.received.append(payload)
63 protocol = os.environ.get(
'PROTOCOL')
64 port = rospy.get_param(
'/rosbridge_websocket/actual_port')
65 url = protocol +
'://127.0.0.1:' + str(port)
67 factory = WebSocketClientFactory(url)
68 factory.protocol = TestClientProtocol
69 reactor.connectTCP(
'127.0.0.1', port, factory)
72 TestClientProtocol.received = []
73 rospy.Subscriber(A_TOPIC, String, ros_received.append)
74 pub = rospy.Publisher(B_TOPIC, String, queue_size=NUM_MSGS)
77 rospy.sleep(WARMUP_DELAY)
78 msg = String(B_STRING)
79 for _
in range(NUM_MSGS):
83 rospy.sleep(WARMUP_DELAY + TIME_LIMIT)
86 reactor.callInThread(publish_timer)
87 reactor.callInThread(shutdown_timer)
90 for received
in TestClientProtocol.received:
91 msg = json.loads(received)
92 self.assertEqual(
'publish', msg[
'op'])
93 self.assertEqual(B_TOPIC, msg[
'topic'])
94 self.assertEqual(B_STRING, msg[
'msg'][
'data'])
95 self.assertEqual(NUM_MSGS, len(TestClientProtocol.received))
97 for msg
in ros_received:
98 self.assertEqual(A_STRING, msg.data)
99 self.assertEqual(NUM_MSGS, len(ros_received))
102 PKG =
'rosbridge_server' 103 NAME =
'test_websocket_smoke' 105 if __name__ ==
'__main__':
106 rospy.init_node(NAME)
108 while not rospy.is_shutdown()
and not rospy.has_param(
'/rosbridge_websocket/actual_port'):
111 rostest.rosrun(PKG, NAME, TestWebsocketSmoke)
def _sendDict(self, msg_dict, times=1)
def onMessage(self, payload, binary)