bridge.py
Go to the documentation of this file.
1 from abc import ABCMeta
2 from typing import Optional, Type, Dict, Union
3 
4 import inject
5 import paho.mqtt.client as mqtt
6 import rospy
7 
8 from .util import lookup_object, extract_values, populate_instance
9 
10 
11 def create_bridge(factory: Union[str, "Bridge"], msg_type: Union[str, Type[rospy.Message]], topic_from: str,
12  topic_to: str, frequency: Optional[float] = None, **kwargs) -> "Bridge":
13  """ generate bridge instance using factory callable and arguments. if `factory` or `meg_type` is provided as string,
14  this function will convert it to a corresponding object.
15  """
16  if isinstance(factory, str):
17  factory = lookup_object(factory)
18  if not issubclass(factory, Bridge):
19  raise ValueError("factory should be Bridge subclass")
20  if isinstance(msg_type, str):
21  msg_type = lookup_object(msg_type)
22  if not issubclass(msg_type, rospy.Message):
23  raise TypeError(
24  "msg_type should be rospy.Message instance or its string"
25  "reprensentation")
26  return factory(
27  topic_from=topic_from, topic_to=topic_to, msg_type=msg_type, frequency=frequency, **kwargs)
28 
29 
30 class Bridge(object, metaclass=ABCMeta):
31  """ Bridge base class """
32  _mqtt_client = inject.attr(mqtt.Client)
33  _serialize = inject.attr('serializer')
34  _deserialize = inject.attr('deserializer')
35  _extract_private_path = inject.attr('mqtt_private_path_extractor')
36 
37 
39  """ Bridge from ROS topic to MQTT
40 
41  bridge ROS messages on `topic_from` to MQTT topic `topic_to`. expect `msg_type` ROS message type.
42  """
43 
44  def __init__(self, topic_from: str, topic_to: str, msg_type: rospy.Message, frequency: Optional[float] = None):
45  self._topic_from = topic_from
46  self._topic_to = self._extract_private_path(topic_to)
47  self._last_published = rospy.get_time()
48  self._interval = 0 if frequency is None else 1.0 / frequency
49  rospy.Subscriber(topic_from, msg_type, self._callback_ros)
50 
51  def _callback_ros(self, msg: rospy.Message):
52  rospy.logdebug("ROS received from {}".format(self._topic_from))
53  now = rospy.get_time()
54  if now - self._last_published >= self._interval:
55  self._publish(msg)
56  self._last_published = now
57 
58  def _publish(self, msg: rospy.Message):
59  payload = self._serialize(extract_values(msg))
60  self._mqtt_client.publish(topic=self._topic_to, payload=payload)
61 
62 
64  """ Bridge from MQTT to ROS topic
65 
66  bridge MQTT messages on `topic_from` to ROS topic `topic_to`. MQTT messages will be converted to `msg_type`.
67  """
68 
69  def __init__(self, topic_from: str, topic_to: str, msg_type: Type[rospy.Message],
70  frequency: Optional[float] = None, queue_size: int = 10):
71  self._topic_from = self._extract_private_path(topic_from)
72  self._topic_to = topic_to
73  self._msg_type = msg_type
74  self._queue_size = queue_size
75  self._last_published = rospy.get_time()
76  self._interval = None if frequency is None else 1.0 / frequency
77  # Adding the correct topic to subscribe to
78  self._mqtt_client.subscribe(self._topic_from)
79  self._mqtt_client.message_callback_add(self._topic_from, self._callback_mqtt)
80  self._publisher = rospy.Publisher(
81  self._topic_to, self._msg_type, queue_size=self._queue_size)
82 
83  def _callback_mqtt(self, client: mqtt.Client, userdata: Dict, mqtt_msg: mqtt.MQTTMessage):
84  """ callback from MQTT """
85  rospy.logdebug("MQTT received from {}".format(mqtt_msg.topic))
86  now = rospy.get_time()
87 
88  if self._interval is None or now - self._last_published >= self._interval:
89  try:
90  ros_msg = self._create_ros_message(mqtt_msg)
91  self._publisher.publish(ros_msg)
92  self._last_published = now
93  except Exception as e:
94  rospy.logerr(e)
95 
96  def _create_ros_message(self, mqtt_msg: mqtt.MQTTMessage) -> rospy.Message:
97  """ create ROS message from MQTT payload """
98  # Hack to enable both, messagepack and json deserialization.
99  if self._serialize.__name__ == "packb":
100  msg_dict = self._deserialize(mqtt_msg.payload, raw=False)
101  else:
102  msg_dict = self._deserialize(mqtt_msg.payload)
103  return populate_instance(msg_dict, self._msg_type())
104 
105 
106 __all__ = ['create_bridge', 'Bridge', 'RosToMqttBridge', 'MqttToRosBridge']
def lookup_object
Definition: util.py:7


mqtt_bridge
Author(s): Junya Hayashi
autogenerated on Mon Feb 15 2021 03:57:51