test_smoke.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import json
3 import os
4 import rostest
5 import sys
6 import threading
7 import unittest
8 
9 import rospy
10 from std_msgs.msg import String
11 
12 from twisted.internet import reactor
13 from autobahn.twisted.websocket import (WebSocketClientProtocol,
14  WebSocketClientFactory)
15 
16 from twisted.python import log
17 log.startLogging(sys.stderr)
18 
19 # For consistency, the number of messages must not exceed the the protocol
20 # Subscriber queue_size.
21 NUM_MSGS = 10
22 MSG_SIZE = 10
23 A_TOPIC = '/a_topic'
24 B_TOPIC = '/b_topic'
25 A_STRING = 'A' * MSG_SIZE
26 B_STRING = 'B' * MSG_SIZE
27 WARMUP_DELAY = 1.0 # seconds
28 TIME_LIMIT = 5.0 # seconds
29 
30 
31 class TestClientProtocol(WebSocketClientProtocol):
32  def onOpen(self):
33  self._sendDict({
34  'op': 'subscribe',
35  'topic': B_TOPIC,
36  'type': 'std_msgs/String',
37  'queue_length': 0, # Test the roslib default.
38  })
39  self._sendDict({
40  'op': 'advertise',
41  'topic': A_TOPIC,
42  'type': 'std_msgs/String',
43  })
44  self._sendDict({
45  'op': 'publish',
46  'topic': A_TOPIC,
47  'msg': {
48  'data': A_STRING,
49  },
50  }, NUM_MSGS)
51 
52  def _sendDict(self, msg_dict, times=1):
53  msg = json.dumps(msg_dict).encode('utf-8')
54  for _ in range(times):
55  self.sendMessage(msg)
56 
57  def onMessage(self, payload, binary):
58  self.__class__.received.append(payload)
59 
60 
61 class TestWebsocketSmoke(unittest.TestCase):
62  def test_smoke(self):
63  protocol = os.environ.get('PROTOCOL')
64  port = rospy.get_param('/rosbridge_websocket/actual_port')
65  url = protocol + '://127.0.0.1:' + str(port)
66 
67  factory = WebSocketClientFactory(url)
68  factory.protocol = TestClientProtocol
69  reactor.connectTCP('127.0.0.1', port, factory)
70 
71  ros_received = []
72  TestClientProtocol.received = []
73  rospy.Subscriber(A_TOPIC, String, ros_received.append)
74  pub = rospy.Publisher(B_TOPIC, String, queue_size=NUM_MSGS)
75 
76  def publish_timer():
77  rospy.sleep(WARMUP_DELAY)
78  msg = String(B_STRING)
79  for _ in range(NUM_MSGS):
80  pub.publish(msg)
81 
82  def shutdown_timer():
83  rospy.sleep(WARMUP_DELAY + TIME_LIMIT)
84  reactor.stop()
85 
86  reactor.callInThread(publish_timer)
87  reactor.callInThread(shutdown_timer)
88  reactor.run()
89 
90  for received in TestClientProtocol.received:
91  msg = json.loads(received)
92  self.assertEqual('publish', msg['op'])
93  self.assertEqual(B_TOPIC, msg['topic'])
94  self.assertEqual(B_STRING, msg['msg']['data'])
95  self.assertEqual(NUM_MSGS, len(TestClientProtocol.received))
96 
97  for msg in ros_received:
98  self.assertEqual(A_STRING, msg.data)
99  self.assertEqual(NUM_MSGS, len(ros_received))
100 
101 
102 PKG = 'rosbridge_server'
103 NAME = 'test_websocket_smoke'
104 
105 if __name__ == '__main__':
106  rospy.init_node(NAME)
107 
108  while not rospy.is_shutdown() and not rospy.has_param('/rosbridge_websocket/actual_port'):
109  rospy.sleep(1.0)
110 
111  rostest.rosrun(PKG, NAME, TestWebsocketSmoke)
def _sendDict(self, msg_dict, times=1)
Definition: test_smoke.py:52
def onMessage(self, payload, binary)
Definition: test_smoke.py:57


rosbridge_server
Author(s): Jonathan Mace
autogenerated on Wed Jun 3 2020 03:55:18