Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 from threading import Lock
00035 from rospy import Subscriber, logerr
00036 from rostopic import get_topic_type
00037 from rosbridge_library.internal import ros_loader, message_conversion
00038 from rosbridge_library.internal.topics import TopicNotEstablishedException
00039 from rosbridge_library.internal.topics import TypeConflictException
00040
00041 """ Manages and interfaces with ROS Subscriber objects. A single subscriber
00042 is shared between multiple clients
00043 """
00044
00045
00046 class MultiSubscriber():
00047 """ Handles multiple clients for a single subscriber.
00048
00049 Converts msgs to JSON before handing them to callbacks. Due to subscriber
00050 callbacks being called in separate threads, must lock whenever modifying
00051 or accessing the subscribed clients. """
00052
00053 def __init__(self, topic, msg_type=None):
00054 """ Register a subscriber on the specified topic.
00055
00056 Keyword arguments:
00057 topic -- the name of the topic to register the subscriber on
00058 msg_type -- (optional) the type to register the subscriber as. If not
00059 provided, an attempt will be made to infer the topic type
00060
00061 Throws:
00062 TopicNotEstablishedException -- if no msg_type was specified by the
00063 caller and the topic is not yet established, so a topic type cannot
00064 be inferred
00065 TypeConflictException -- if the msg_type was specified by the
00066 caller and the topic is established, and the established type is
00067 different to the user-specified msg_type
00068
00069 """
00070
00071 topic_type = get_topic_type(topic)[0]
00072
00073
00074 if msg_type is None and topic_type is None:
00075 raise TopicNotEstablishedException(topic)
00076
00077
00078 if msg_type is None:
00079 msg_type = topic_type
00080
00081
00082 msg_class = ros_loader.get_message_class(msg_type)
00083
00084
00085 if topic_type is not None and topic_type != msg_class._type:
00086 raise TypeConflictException(topic, topic_type, msg_class._type)
00087
00088
00089 self.subscriptions = {}
00090 self.lock = Lock()
00091 self.topic = topic
00092 self.msg_class = msg_class
00093 self.subscriber = Subscriber(topic, msg_class, self.callback)
00094
00095 def unregister(self):
00096 self.subscriber.unregister()
00097 with self.lock:
00098 self.subscriptions.clear()
00099
00100 def verify_type(self, msg_type):
00101 """ Verify that the subscriber subscribes to messages of this type.
00102
00103 Keyword arguments:
00104 msg_type -- the type to check this subscriber against
00105
00106 Throws:
00107 Exception -- if ros_loader cannot load the specified msg type
00108 TypeConflictException -- if the msg_type is different than the type of
00109 this publisher
00110
00111 """
00112 if not ros_loader.get_message_class(msg_type) is self.msg_class:
00113 raise TypeConflictException(self.topic,
00114 self.msg_class._type, msg_type)
00115 return
00116
00117 def subscribe(self, client_id, callback):
00118 """ Subscribe the specified client to this subscriber.
00119
00120 Keyword arguments:
00121 client_id -- the ID of the client subscribing
00122 callback -- this client's callback, that will be called for incoming
00123 messages
00124
00125 """
00126 with self.lock:
00127 self.subscriptions[client_id] = callback
00128
00129
00130 self.subscriber.impl.add_callback(self.callback, [callback])
00131 self.subscriber.impl.remove_callback(self.callback, [callback])
00132
00133 def unsubscribe(self, client_id):
00134 """ Unsubscribe the specified client from this subscriber
00135
00136 Keyword arguments:
00137 client_id -- the ID of the client to unsubscribe
00138
00139 """
00140 with self.lock:
00141 del self.subscriptions[client_id]
00142
00143 def has_subscribers(self):
00144 """ Return true if there are subscribers """
00145 with self.lock:
00146 ret = len(self.subscriptions) != 0
00147 return ret
00148
00149 def callback(self, msg, callbacks=None):
00150 """ Callback for incoming messages on the rospy.Subscriber
00151
00152 Converts the incoming msg to JSON, then passes the JSON to the
00153 registered subscriber callbacks.
00154
00155 Keyword Arguments:
00156 msg - the ROS message coming from the subscriber
00157 callbacks - subscriber callbacks to invoke
00158
00159 """
00160
00161 json = None
00162 try:
00163 json = message_conversion.extract_values(msg)
00164 except:
00165 return
00166
00167
00168 if not callbacks:
00169 with self.lock:
00170 callbacks = self.subscriptions.values()
00171
00172
00173 for callback in callbacks:
00174 try:
00175 callback(json)
00176 except Exception as exc:
00177
00178 logerr("Exception calling subscribe callback: %s", exc)
00179 pass
00180
00181
00182 class SubscriberManager():
00183 """
00184 Keeps track of client subscriptions
00185 """
00186
00187 def __init__(self):
00188 self._subscribers = {}
00189
00190 def subscribe(self, client_id, topic, callback, msg_type=None):
00191 """ Subscribe to a topic
00192
00193 Keyword arguments:
00194 client_id -- the ID of the client making this subscribe request
00195 topic -- the name of the topic to subscribe to
00196 callback -- the callback to call for incoming messages on the topic
00197 msg_type -- (optional) the type of the topic
00198
00199 """
00200 if not topic in self._subscribers:
00201 self._subscribers[topic] = MultiSubscriber(topic, msg_type)
00202
00203 if msg_type is not None:
00204 self._subscribers[topic].verify_type(msg_type)
00205
00206 self._subscribers[topic].subscribe(client_id, callback)
00207
00208 def unsubscribe(self, client_id, topic):
00209 """ Unsubscribe from a topic
00210
00211 Keyword arguments:
00212 client_id -- the ID of the client to unsubscribe
00213 topic -- the topic to unsubscribe from
00214
00215 """
00216 if not topic in self._subscribers:
00217 return
00218
00219 self._subscribers[topic].unsubscribe(client_id)
00220
00221 if not self._subscribers[topic].has_subscribers():
00222 self._subscribers[topic].unregister()
00223 del self._subscribers[topic]
00224
00225
00226 manager = SubscriberManager()
00227