bridge.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 from __future__ import absolute_import
3 
4 from abc import ABCMeta, abstractmethod
5 
6 import inject
7 import paho.mqtt.client as mqtt
8 import rospy
9 
10 from .util import lookup_object, extract_values, populate_instance
11 
12 
13 def create_bridge(factory, msg_type, topic_from, topic_to, **kwargs):
14  u""" bridge generator function
15 
16  :param (str|class) factory: Bridge class
17  :param (str|class) msg_type: ROS message type
18  :param str topic_from: incoming topic path
19  :param str topic_to: outgoing topic path
20  :param (float|None) frequency: publish frequency
21  :return Bridge: bridge object
22  """
23  if isinstance(factory, basestring):
24  factory = lookup_object(factory)
25  if not issubclass(factory, Bridge):
26  raise ValueError("factory should be Bridge subclass")
27  if isinstance(msg_type, basestring):
28  msg_type = lookup_object(msg_type)
29  if not issubclass(msg_type, rospy.Message):
30  raise TypeError(
31  "msg_type should be rospy.Message instance or its string"
32  "reprensentation")
33  return factory(
34  topic_from=topic_from, topic_to=topic_to, msg_type=msg_type, **kwargs)
35 
36 
37 class Bridge(object):
38  u""" Bridge base class
39 
40  :param mqtt.Client _mqtt_client: MQTT client
41  :param _serialize: message serialize callable
42  :param _deserialize: message deserialize callable
43  """
44  __metaclass__ = ABCMeta
45 
46  _mqtt_client = inject.attr(mqtt.Client)
47  _serialize = inject.attr('serializer')
48  _deserialize = inject.attr('deserializer')
49  _extract_private_path = inject.attr('mqtt_private_path_extractor')
50 
51 
53  u""" Bridge from ROS topic to MQTT
54 
55  :param str topic_from: incoming ROS topic path
56  :param str topic_to: outgoing MQTT topic path
57  :param class msg_type: subclass of ROS Message
58  :param (float|None) frequency: publish frequency
59  """
60 
61  def __init__(self, topic_from, topic_to, msg_type, frequency=None):
62  self._topic_from = topic_from
63  self._topic_to = self._extract_private_path(topic_to)
64  self._last_published = rospy.get_time()
65  self._interval = 0 if frequency is None else 1.0 / frequency
66  rospy.Subscriber(topic_from, msg_type, self._callback_ros)
67 
68  def _callback_ros(self, msg):
69  rospy.logdebug("ROS received from {}".format(self._topic_from))
70  now = rospy.get_time()
71  if now - self._last_published > self._interval:
72  self._publish(msg)
73  self._last_published = now
74 
75  def _publish(self, msg):
76  payload = bytearray(self._serialize(extract_values(msg)))
77  self._mqtt_client.publish(topic=self._topic_to, payload=payload)
78 
79 
81  u""" Bridge from MQTT to ROS topic
82 
83  :param str topic_from: incoming MQTT topic path
84  :param str topic_to: outgoing ROS topic path
85  :param class msg_type: subclass of ROS Message
86  :param (float|None) frequency: publish frequency
87  :param int queue_size: ROS publisher's queue size
88  """
89 
90  def __init__(self, topic_from, topic_to, msg_type, frequency=None,
91  queue_size=10):
92  self._topic_from = self._extract_private_path(topic_from)
93  self._topic_to = topic_to
94  self._msg_type = msg_type
95  self._queue_size = queue_size
96  self._last_published = rospy.get_time()
97  self._interval = None if frequency is None else 1.0 / frequency
98 
99  self._mqtt_client.subscribe(topic_from)
100  self._mqtt_client.message_callback_add(topic_from, self._callback_mqtt)
101  self._publisher = rospy.Publisher(
102  self._topic_to, self._msg_type, queue_size=self._queue_size)
103 
104  def _callback_mqtt(self, client, userdata, mqtt_msg):
105  u""" callback from MQTT
106 
107  :param mqtt.Client client: MQTT client used in connection
108  :param userdata: user defined data
109  :param mqtt.MQTTMessage mqtt_msg: MQTT message
110  """
111  rospy.logdebug("MQTT received from {}".format(mqtt_msg.topic))
112  now = rospy.get_time()
113 
114  if self._interval is None or now - self._last_published >= self._interval:
115  try:
116  ros_msg = self._create_ros_message(mqtt_msg)
117  self._publisher.publish(ros_msg)
118  self._last_published = now
119  except Exception as e:
120  rospy.logerr(e)
121 
122  def _create_ros_message(self, mqtt_msg):
123  u""" create ROS message from MQTT payload
124 
125  :param mqtt.Message mqtt_msg: MQTT Message
126  :return rospy.Message: ROS Message
127  """
128  msg_dict = self._deserialize(mqtt_msg.payload)
129  return populate_instance(msg_dict, self._msg_type())
130 
131 
132 __all__ = ['register_bridge_factory', 'create_bridge', 'Bridge',
133  'RosToMqttBridge', 'MqttToRosBridge']
def _callback_mqtt(self, client, userdata, mqtt_msg)
Definition: bridge.py:104
def lookup_object(object_path, package='mqtt_bridge')
Definition: util.py:8
def __init__(self, topic_from, topic_to, msg_type, frequency=None, queue_size=10)
Definition: bridge.py:91
def create_bridge(factory, msg_type, topic_from, topic_to, kwargs)
Definition: bridge.py:13
def __init__(self, topic_from, topic_to, msg_type, frequency=None)
Definition: bridge.py:61
def _create_ros_message(self, mqtt_msg)
Definition: bridge.py:122
def _callback_ros(self, msg)
Definition: bridge.py:68


mqtt_bridge
Author(s): Junya Hayashi
autogenerated on Thu Jun 6 2019 19:18:59