Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 from threading import Lock
00035 from rospy import Subscriber, logerr
00036 from rostopic import get_topic_type
00037 from rosbridge_library.internal import ros_loader, message_conversion
00038 from rosbridge_library.internal.topics import TopicNotEstablishedException
00039 from rosbridge_library.internal.topics import TypeConflictException
00040
00041 """ Manages and interfaces with ROS Subscriber objects. A single subscriber
00042 is shared between multiple clients
00043 """
00044
00045
00046 class MultiSubscriber():
00047 """ Handles multiple clients for a single subscriber.
00048
00049 Converts msgs to JSON before handing them to callbacks. Due to subscriber
00050 callbacks being called in separate threads, must lock whenever modifying
00051 or accessing the subscribed clients. """
00052
00053 def __init__(self, topic, msg_type=None):
00054 """ Register a subscriber on the specified topic.
00055
00056 Keyword arguments:
00057 topic -- the name of the topic to register the subscriber on
00058 msg_type -- (optional) the type to register the subscriber as. If not
00059 provided, an attempt will be made to infer the topic type
00060
00061 Throws:
00062 TopicNotEstablishedException -- if no msg_type was specified by the
00063 caller and the topic is not yet established, so a topic type cannot
00064 be inferred
00065 TypeConflictException -- if the msg_type was specified by the
00066 caller and the topic is established, and the established type is
00067 different to the user-specified msg_type
00068
00069 """
00070
00071 topic_type = get_topic_type(topic)[0]
00072
00073
00074 if msg_type is None and topic_type is None:
00075 raise TopicNotEstablishedException(topic)
00076
00077
00078 if msg_type is None:
00079 msg_type = topic_type
00080
00081
00082 msg_class = ros_loader.get_message_class(msg_type)
00083
00084
00085 if topic_type is not None and topic_type != msg_class._type:
00086 raise TypeConflictException(topic, topic_type, msg_class._type)
00087
00088
00089 self.subscriptions = {}
00090 self.lock = Lock()
00091 self.topic = topic
00092 self.msg_class = msg_class
00093 self.subscriber = Subscriber(topic, msg_class, self.callback)
00094
00095 def unregister(self):
00096 self.subscriber.unregister()
00097 with self.lock:
00098 self.subscriptions.clear()
00099
00100 def verify_type(self, msg_type):
00101 """ Verify that the subscriber subscribes to messages of this type.
00102
00103 Keyword arguments:
00104 msg_type -- the type to check this subscriber against
00105
00106 Throws:
00107 Exception -- if ros_loader cannot load the specified msg type
00108 TypeConflictException -- if the msg_type is different than the type of
00109 this publisher
00110
00111 """
00112 if not ros_loader.get_message_class(msg_type) is self.msg_class:
00113 raise TypeConflictException(self.topic,
00114 self.msg_class._type, msg_type)
00115 return
00116
00117 def subscribe(self, client_id, callback):
00118 """ Subscribe the specified client to this subscriber.
00119
00120 Keyword arguments:
00121 client_id -- the ID of the client subscribing
00122 callback -- this client's callback, that will be called for incoming
00123 messages
00124
00125 """
00126 with self.lock:
00127 self.subscriptions[client_id] = callback
00128
00129
00130 self.subscriber.impl.add_callback(self.callback, [callback])
00131 self.subscriber.impl.remove_callback(self.callback, [callback])
00132
00133 def unsubscribe(self, client_id):
00134 """ Unsubscribe the specified client from this subscriber
00135
00136 Keyword arguments:
00137 client_id -- the ID of the client to unsubscribe
00138
00139 """
00140 with self.lock:
00141 del self.subscriptions[client_id]
00142
00143 def has_subscribers(self):
00144 """ Return true if there are subscribers """
00145 with self.lock:
00146 ret = len(self.subscriptions) != 0
00147 return ret
00148
00149 def callback(self, msg, callbacks=None):
00150 """ Callback for incoming messages on the rospy.Subscriber
00151
00152 Converts the incoming msg to JSON, then passes the JSON to the
00153 registered subscriber callbacks.
00154
00155 Keyword Arguments:
00156 msg - the ROS message coming from the subscriber
00157 callbacks - subscriber callbacks to invoke
00158
00159 """
00160
00161 json = None
00162 try:
00163 json = message_conversion.extract_values(msg)
00164 except Exception as exc:
00165 logerr("Exception while converting messages in subscriber callback : %s", exc)
00166 return
00167
00168
00169 if not callbacks:
00170 with self.lock:
00171 callbacks = self.subscriptions.values()
00172
00173
00174 for callback in callbacks:
00175 try:
00176 callback(json)
00177 except Exception as exc:
00178
00179 logerr("Exception calling subscribe callback: %s", exc)
00180 pass
00181
00182
00183 class SubscriberManager():
00184 """
00185 Keeps track of client subscriptions
00186 """
00187
00188 def __init__(self):
00189 self._subscribers = {}
00190
00191 def subscribe(self, client_id, topic, callback, msg_type=None):
00192 """ Subscribe to a topic
00193
00194 Keyword arguments:
00195 client_id -- the ID of the client making this subscribe request
00196 topic -- the name of the topic to subscribe to
00197 callback -- the callback to call for incoming messages on the topic
00198 msg_type -- (optional) the type of the topic
00199
00200 """
00201 if not topic in self._subscribers:
00202 self._subscribers[topic] = MultiSubscriber(topic, msg_type)
00203
00204 if msg_type is not None:
00205 self._subscribers[topic].verify_type(msg_type)
00206
00207 self._subscribers[topic].subscribe(client_id, callback)
00208
00209 def unsubscribe(self, client_id, topic):
00210 """ Unsubscribe from a topic
00211
00212 Keyword arguments:
00213 client_id -- the ID of the client to unsubscribe
00214 topic -- the topic to unsubscribe from
00215
00216 """
00217 if not topic in self._subscribers:
00218 return
00219
00220 self._subscribers[topic].unsubscribe(client_id)
00221
00222 if not self._subscribers[topic].has_subscribers():
00223 self._subscribers[topic].unregister()
00224 del self._subscribers[topic]
00225
00226
00227 manager = SubscriberManager()
00228