6 from logging
import getLogger
9 from mock
import MagicMock
13 import paho.mqtt.client
as mqtt
14 from std_msgs.msg
import Bool, String
16 logging.basicConfig(stream=sys.stderr)
17 logger = getLogger(__name__)
18 logger.setLevel(logging.DEBUG)
27 self.
mqttc = mqtt.Client(
"client-id")
28 self.mqttc.connect(
"localhost", 1883)
31 self.mqttc.subscribe(
"ping")
32 self.mqttc.subscribe(
"echo")
33 self.mqttc.loop_start()
35 rospy.init_node(
'mqtt_bridge_test_node')
46 self.subscriber_ping.unregister()
47 self.subscriber_pong.unregister()
48 self.subscriber_echo.unregister()
49 self.subscriber_back.unregister()
50 self.mqttc.loop_stop()
51 self.mqttc.disconnect()
55 pub = rospy.Publisher(topic_path, msg_type, **kwargs)
58 num_cons = pub.get_num_connections()
59 if num_cons == num_subs:
62 self.fail(
"failed to get publisher")
65 ros_master = rosgraph.Master(
'/rostopic')
66 topic_path = rosgraph.names.script_resolve_name(
'rostopic', topic_path)
67 state = ros_master.getSystemState()
70 if sub[0] == topic_path:
76 if callback_func.called:
79 self.fail(
"callback doesn't be triggered")
83 publisher.publish(Bool(
True))
85 self.ros_callback_ping.assert_called_once_with(Bool(
True))
87 self.ros_callback_pong.assert_called_once_with(Bool(
True))
88 self.mqtt_callback_ping.assert_called_once()
89 msg = self.mqtt_callback_ping.call_args[0][2]
90 self.assertEqual(msg.topic,
"ping")
91 self.assertEqual(msg.payload, msgpack.dumps({
"data":
True}))
95 publisher.publish(String(
"hello"))
97 self.ros_callback_echo.assert_called_once_with(String(
"hello"))
99 self.ros_callback_back.assert_called_once_with(String(
"hello"))
100 self.mqtt_callback_echo.assert_called_once()
101 msg = self.mqtt_callback_echo.call_args[0][2]
102 self.assertEqual(msg.topic,
"echo")
103 self.assertEqual(msg.payload, msgpack.dumps({
"data":
"hello"}))
106 if __name__ ==
'__main__':
108 rostest.rosrun(
'mqtt_bridge',
'mqtt_bridge_test', TestMqttBridge)
def get_publisher(self, topic_path, msg_type, kwargs)
def _get_subscribers(self, topic_path)
def _wait_callback(self, callback_func)