1 from abc
import ABCMeta
2 from typing
import Optional, Type, Dict, Union
5 import paho.mqtt.client
as mqtt
8 from .util
import lookup_object, extract_values, populate_instance
11 def create_bridge(factory: Union[str,
"Bridge"], msg_type: Union[str, Type[rospy.Message]], topic_from: str,
12 topic_to: str, frequency: Optional[float] =
None, **kwargs) ->
"Bridge":
13 """ generate bridge instance using factory callable and arguments. if `factory` or `meg_type` is provided as string, 14 this function will convert it to a corresponding object. 16 if isinstance(factory, str):
18 if not issubclass(factory, Bridge):
19 raise ValueError(
"factory should be Bridge subclass")
20 if isinstance(msg_type, str):
22 if not issubclass(msg_type, rospy.Message):
24 "msg_type should be rospy.Message instance or its string" 27 topic_from=topic_from, topic_to=topic_to, msg_type=msg_type, frequency=frequency, **kwargs)
30 class Bridge(object, metaclass=ABCMeta):
31 """ Bridge base class """ 32 _mqtt_client = inject.attr(mqtt.Client)
33 _serialize = inject.attr(
'serializer')
34 _deserialize = inject.attr(
'deserializer')
35 _extract_private_path = inject.attr(
'mqtt_private_path_extractor')
39 """ Bridge from ROS topic to MQTT 41 bridge ROS messages on `topic_from` to MQTT topic `topic_to`. expect `msg_type` ROS message type. 44 def __init__(self, topic_from: str, topic_to: str, msg_type: rospy.Message, frequency: Optional[float] =
None):
48 self.
_interval = 0
if frequency
is None else 1.0 / frequency
52 rospy.logdebug(
"ROS received from {}".format(self.
_topic_from))
53 now = rospy.get_time()
60 self._mqtt_client.publish(topic=self.
_topic_to, payload=payload)
64 """ Bridge from MQTT to ROS topic 66 bridge MQTT messages on `topic_from` to ROS topic `topic_to`. MQTT messages will be converted to `msg_type`. 69 def __init__(self, topic_from: str, topic_to: str, msg_type: Type[rospy.Message],
70 frequency: Optional[float] =
None, queue_size: int = 10):
76 self.
_interval =
None if frequency
is None else 1.0 / frequency
83 def _callback_mqtt(self, client: mqtt.Client, userdata: Dict, mqtt_msg: mqtt.MQTTMessage):
84 """ callback from MQTT """ 85 rospy.logdebug(
"MQTT received from {}".format(mqtt_msg.topic))
86 now = rospy.get_time()
91 self._publisher.publish(ros_msg)
93 except Exception
as e:
97 """ create ROS message from MQTT payload """ 99 if self._serialize.__name__ ==
"packb":
100 msg_dict = self.
_deserialize(mqtt_msg.payload, raw=
False)
103 return populate_instance(msg_dict, self.
_msg_type())
106 __all__ = [
'create_bridge',
'Bridge',
'RosToMqttBridge',
'MqttToRosBridge']