mqtt_bridge_node_test.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import logging
3 import sys
4 import time
5 import unittest
6 from logging import getLogger
7 
8 import msgpack
9 from mock import MagicMock
10 
11 import rosgraph
12 import rospy
13 import paho.mqtt.client as mqtt
14 from std_msgs.msg import Bool, String
15 
16 logging.basicConfig(stream=sys.stderr)
17 logger = getLogger(__name__)
18 logger.setLevel(logging.DEBUG)
19 
20 
21 class TestMqttBridge(unittest.TestCase):
22 
23  def setUp(self):
24  # mqtt
25  self.mqtt_callback_ping = MagicMock()
26  self.mqtt_callback_echo = MagicMock()
27  self.mqttc = mqtt.Client("client-id")
28  self.mqttc.connect("localhost", 1883)
29  self.mqttc.message_callback_add("ping", self.mqtt_callback_ping)
30  self.mqttc.message_callback_add("echo", self.mqtt_callback_echo)
31  self.mqttc.subscribe("ping")
32  self.mqttc.subscribe("echo")
33  self.mqttc.loop_start()
34  # ros
35  rospy.init_node('mqtt_bridge_test_node')
36  self.ros_callback_ping = MagicMock()
37  self.ros_callback_pong = MagicMock()
38  self.ros_callback_echo = MagicMock()
39  self.ros_callback_back = MagicMock()
40  self.subscriber_ping = rospy.Subscriber("/ping", Bool, self.ros_callback_ping)
41  self.subscriber_pong = rospy.Subscriber("/pong", Bool, self.ros_callback_pong)
42  self.subscriber_echo = rospy.Subscriber("/echo", String, self.ros_callback_echo)
43  self.subscriber_back = rospy.Subscriber("/back", String, self.ros_callback_back)
44 
45  def tearDown(self):
46  self.subscriber_ping.unregister()
47  self.subscriber_pong.unregister()
48  self.subscriber_echo.unregister()
49  self.subscriber_back.unregister()
50  self.mqttc.loop_stop()
51  self.mqttc.disconnect()
52 
53  def get_publisher(self, topic_path, msg_type, **kwargs):
54  # wait until the number of connections would be same as ros master
55  pub = rospy.Publisher(topic_path, msg_type, **kwargs)
56  num_subs = len(self._get_subscribers(topic_path))
57  for i in range(20):
58  num_cons = pub.get_num_connections()
59  if num_cons == num_subs:
60  return pub
61  time.sleep(0.1)
62  self.fail("failed to get publisher")
63 
64  def _get_subscribers(self, topic_path):
65  ros_master = rosgraph.Master('/rostopic')
66  topic_path = rosgraph.names.script_resolve_name('rostopic', topic_path)
67  state = ros_master.getSystemState()
68  subs = []
69  for sub in state[1]:
70  if sub[0] == topic_path:
71  subs.extend(sub[1])
72  return subs
73 
74  def _wait_callback(self, callback_func):
75  for i in range(10):
76  if callback_func.called:
77  return
78  time.sleep(0.1)
79  self.fail("callback doesn't be triggered")
80 
81  def test_ping_pong(self):
82  publisher = self.get_publisher("/ping", Bool, queue_size=1)
83  publisher.publish(Bool(True))
85  self.ros_callback_ping.assert_called_once_with(Bool(True))
87  self.ros_callback_pong.assert_called_once_with(Bool(True))
88  self.mqtt_callback_ping.assert_called_once()
89  msg = self.mqtt_callback_ping.call_args[0][2]
90  self.assertEqual(msg.topic, "ping")
91  self.assertEqual(msg.payload, msgpack.dumps({"data": True}))
92 
93  def test_echo_back(self):
94  publisher = self.get_publisher("/echo", String, queue_size=1)
95  publisher.publish(String("hello"))
97  self.ros_callback_echo.assert_called_once_with(String("hello"))
99  self.ros_callback_back.assert_called_once_with(String("hello"))
100  self.mqtt_callback_echo.assert_called_once()
101  msg = self.mqtt_callback_echo.call_args[0][2]
102  self.assertEqual(msg.topic, "echo")
103  self.assertEqual(msg.payload, msgpack.dumps({"data": "hello"}))
104 
105 
106 if __name__ == '__main__':
107  import rostest
108  rostest.rosrun('mqtt_bridge', 'mqtt_bridge_test', TestMqttBridge)
def get_publisher(self, topic_path, msg_type, kwargs)


mqtt_bridge
Author(s): Junya Hayashi
autogenerated on Mon Feb 15 2021 03:57:51