subscribers.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2012, Willow Garage, Inc.
4 # Copyright (c) 2013, PAL Robotics SL
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 from threading import Lock
35 from rospy import Subscriber, logerr
36 from rostopic import get_topic_type
37 from rosbridge_library.internal import ros_loader, message_conversion
38 from rosbridge_library.internal.topics import TopicNotEstablishedException
39 from rosbridge_library.internal.topics import TypeConflictException
40 from rosbridge_library.internal.outgoing_message import OutgoingMessage
41 from rospy.msg import AnyMsg
42 
43 """ Manages and interfaces with ROS Subscriber objects. A single subscriber
44 is shared between multiple clients
45 """
46 
47 
49  """ Handles multiple clients for a single subscriber.
50 
51  Converts msgs to JSON before handing them to callbacks. Due to subscriber
52  callbacks being called in separate threads, must lock whenever modifying
53  or accessing the subscribed clients. """
54 
55  def __init__(self, topic, msg_type=None):
56  """ Register a subscriber on the specified topic.
57 
58  Keyword arguments:
59  topic -- the name of the topic to register the subscriber on
60  msg_type -- (optional) the type to register the subscriber as. If not
61  provided, an attempt will be made to infer the topic type
62 
63  Throws:
64  TopicNotEstablishedException -- if no msg_type was specified by the
65  caller and the topic is not yet established, so a topic type cannot
66  be inferred
67  TypeConflictException -- if the msg_type was specified by the
68  caller and the topic is established, and the established type is
69  different to the user-specified msg_type
70 
71  """
72  # First check to see if the topic is already established
73  topic_type = get_topic_type(topic)[0]
74 
75  # If it's not established and no type was specified, exception
76  if msg_type is None and topic_type is None:
77  raise TopicNotEstablishedException(topic)
78 
79  # Use the established topic type if none was specified
80  if msg_type is None:
81  msg_type = topic_type
82 
83  if msg_type == "__AnyMsg":
84  msg_class = AnyMsg
85  else:
86  # Load the message class, propagating any exceptions from bad msg types
87  msg_class = ros_loader.get_message_class(msg_type)
88 
89  # Make sure the specified msg type and established msg type are same
90  if topic_type is not None and topic_type != msg_class._type:
91  raise TypeConflictException(topic, topic_type, msg_class._type)
92 
93  # Create the subscriber and associated member variables
94  self.subscriptions = {}
95  self.lock = Lock()
96  self.topic = topic
97  self.msg_class = msg_class
98  self.subscriber = Subscriber(topic, msg_class, self.callback)
99 
100  def unregister(self):
101  self.subscriber.unregister()
102  with self.lock:
103  self.subscriptions.clear()
104 
105  def verify_type(self, msg_type):
106  """ Verify that the subscriber subscribes to messages of this type.
107 
108  Keyword arguments:
109  msg_type -- the type to check this subscriber against
110 
111  Throws:
112  Exception -- if ros_loader cannot load the specified msg type
113  TypeConflictException -- if the msg_type is different than the type of
114  this publisher
115 
116  """
117  if msg_type == "__AnyMsg":
118  return
119  if not ros_loader.get_message_class(msg_type) is self.msg_class:
120  raise TypeConflictException(self.topic,
121  self.msg_class._type, msg_type)
122 
123  def subscribe(self, client_id, callback):
124  """ Subscribe the specified client to this subscriber.
125 
126  Keyword arguments:
127  client_id -- the ID of the client subscribing
128  callback -- this client's callback, that will be called for incoming
129  messages
130 
131  """
132  with self.lock:
133  self.subscriptions[client_id] = callback
134  # If the topic is latched, add_callback will immediately invoke
135  # the given callback.
136  self.subscriber.impl.add_callback(self.callback, [callback])
137  self.subscriber.impl.remove_callback(self.callback, [callback])
138 
139  def unsubscribe(self, client_id):
140  """ Unsubscribe the specified client from this subscriber
141 
142  Keyword arguments:
143  client_id -- the ID of the client to unsubscribe
144 
145  """
146  with self.lock:
147  del self.subscriptions[client_id]
148 
149  def has_subscribers(self):
150  """ Return true if there are subscribers """
151  with self.lock:
152  return len(self.subscriptions) != 0
153 
154  def callback(self, msg, callbacks=None):
155  """ Callback for incoming messages on the rospy.Subscriber
156 
157  Passes the message to registered subscriber callbacks.
158 
159  Keyword Arguments:
160  msg - the ROS message coming from the subscriber
161  callbacks - subscriber callbacks to invoke
162 
163  """
164  outgoing = OutgoingMessage(msg)
165 
166  # Get the callbacks to call
167  if not callbacks:
168  with self.lock:
169  callbacks = list(self.subscriptions.values())
170 
171  # Pass the JSON to each of the callbacks
172  for callback in callbacks:
173  try:
174  callback(outgoing)
175  except Exception as exc:
176  # Do nothing if one particular callback fails except log it
177  logerr("Exception calling subscribe callback: %s", exc)
178 
179 
181  """
182  Keeps track of client subscriptions
183  """
184 
185  def __init__(self):
186  self._lock = Lock()
187  self._subscribers = {}
188 
189  def subscribe(self, client_id, topic, callback, msg_type=None):
190  """ Subscribe to a topic
191 
192  Keyword arguments:
193  client_id -- the ID of the client making this subscribe request
194  topic -- the name of the topic to subscribe to
195  callback -- the callback to call for incoming messages on the topic
196  msg_type -- (optional) the type of the topic
197 
198  """
199  with self._lock:
200  if not topic in self._subscribers:
201  self._subscribers[topic] = MultiSubscriber(topic, msg_type)
202 
203  if msg_type is not None:
204  self._subscribers[topic].verify_type(msg_type)
205 
206  self._subscribers[topic].subscribe(client_id, callback)
207 
208  def unsubscribe(self, client_id, topic):
209  """ Unsubscribe from a topic
210 
211  Keyword arguments:
212  client_id -- the ID of the client to unsubscribe
213  topic -- the topic to unsubscribe from
214 
215  """
216  with self._lock:
217  if topic in self._subscribers:
218  self._subscribers[topic].unsubscribe(client_id)
219 
220  if not self._subscribers[topic].has_subscribers():
221  self._subscribers[topic].unregister()
222  del self._subscribers[topic]
223 
224 
225 manager = SubscriberManager()
226 
rosbridge_library.internal.subscribers.MultiSubscriber.has_subscribers
def has_subscribers(self)
Definition: subscribers.py:149
rosbridge_library.internal.subscribers.MultiSubscriber.lock
lock
Definition: subscribers.py:95
rosbridge_library.internal.subscribers.SubscriberManager.__init__
def __init__(self)
Definition: subscribers.py:185
rosbridge_library.internal.subscribers.MultiSubscriber.subscriber
subscriber
Definition: subscribers.py:98
rosbridge_library.internal.topics.TopicNotEstablishedException
Definition: topics.py:36
rosbridge_library.internal.topics
Definition: topics.py:1
rosbridge_library.internal.subscribers.SubscriberManager
Definition: subscribers.py:180
rosbridge_library.internal.subscribers.MultiSubscriber.verify_type
def verify_type(self, msg_type)
Definition: subscribers.py:105
rosbridge_library.internal.subscribers.MultiSubscriber.callback
def callback(self, msg, callbacks=None)
Definition: subscribers.py:154
rosbridge_library.internal.subscribers.MultiSubscriber.unsubscribe
def unsubscribe(self, client_id)
Definition: subscribers.py:139
rosbridge_library.internal.subscribers.SubscriberManager._lock
_lock
Definition: subscribers.py:186
rosbridge_library.internal.subscribers.MultiSubscriber
Definition: subscribers.py:48
rosbridge_library.internal.topics.TypeConflictException
Definition: topics.py:43
rosbridge_library.internal.subscribers.SubscriberManager.subscribe
def subscribe(self, client_id, topic, callback, msg_type=None)
Definition: subscribers.py:189
rosbridge_library.internal.subscribers.MultiSubscriber.unregister
def unregister(self)
Definition: subscribers.py:100
rosbridge_library.internal.subscribers.SubscriberManager.unsubscribe
def unsubscribe(self, client_id, topic)
Definition: subscribers.py:208
rosbridge_library.internal.subscribers.MultiSubscriber.__init__
def __init__(self, topic, msg_type=None)
Definition: subscribers.py:55
rosbridge_library.internal.subscribers.MultiSubscriber.topic
topic
Definition: subscribers.py:96
rosbridge_library.internal.subscribers.MultiSubscriber.subscriptions
subscriptions
Definition: subscribers.py:94
rosbridge_library.internal.outgoing_message
Definition: outgoing_message.py:1
rosbridge_library.internal.subscribers.MultiSubscriber.subscribe
def subscribe(self, client_id, callback)
Definition: subscribers.py:123
rosbridge_library.internal.outgoing_message.OutgoingMessage
Definition: outgoing_message.py:10
rosbridge_library.internal
Definition: src/rosbridge_library/internal/__init__.py:1
rosbridge_library.internal.subscribers.SubscriberManager._subscribers
_subscribers
Definition: subscribers.py:187
rosbridge_library.internal.subscribers.MultiSubscriber.msg_class
msg_class
Definition: subscribers.py:97


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Tue Oct 3 2023 02:12:45