publishers.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2012, Willow Garage, Inc.
4 # Copyright (c) 2014, Creativa 77 SRL
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 from time import time
35 from copy import copy
36 from threading import Lock, Timer
37 from rospy import Publisher, SubscribeListener
38 from rospy import logwarn
39 from rostopic import get_topic_type
40 from rosbridge_library.internal import ros_loader, message_conversion
41 from rosbridge_library.internal.topics import TopicNotEstablishedException, TypeConflictException
42 
43 
44 class PublisherConsistencyListener(SubscribeListener):
45  """ This class is used to solve the problem that sometimes we create a
46  publisher and then immediately publish a message, before the subscribers
47  have set up their connections.
48 
49  Call attach() to attach the listener to a publisher. It sets up a buffer
50  of outgoing messages, then when a new connection occurs, sends the messages
51  in the buffer.
52 
53  Call detach() to detach the listener from the publisher and restore the
54  original publish methods.
55 
56  After some particular timeout (default to 1 second), the listener stops
57  buffering messages as it is assumed by this point all subscribers will have
58  successfully set up their connections."""
59 
60  timeout = 1 # Timeout in seconds to wait for new subscribers
61  attached = False
62 
63  def attach(self, publisher):
64  """ Overrides the publisher's publish method, and attaches a subscribe
65  listener to the publisher, effectively routing incoming connections
66  and outgoing publish requests through this class instance """
67  # Do the attaching
68  self.publisher = publisher
69  publisher.impl.add_subscriber_listener(self)
70  self.publish = publisher.publish
71  publisher.publish = self.publish_override
72 
73  # Set state variables
74  self.lock = Lock()
75  self.established_time = time()
76  self.msg_buffer = []
77  self.attached = True
78 
79  def detach(self):
80  """ Restores the publisher's original publish method and unhooks the
81  subscribe listeners, effectively finishing with this object """
82  self.publisher.publish = self.publish
83  if self in self.publisher.impl.subscriber_listeners:
84  self.publisher.impl.subscriber_listeners.remove(self)
85  self.attached = False
86 
87  def peer_subscribe(self, topic_name, topic_publish, peer_publish):
88  """ Called whenever there's a new subscription.
89 
90  If we're still inside the subscription setup window, then we publish
91  any buffered messages to the peer.
92 
93  We also check if we're timed out, but if we are we don't detach (due
94  to threading complications), we just don't propagate buffered messages
95  """
96  if not self.timed_out():
97  self.lock.acquire()
98  msgs = copy(self.msg_buffer)
99  self.lock.release()
100  for msg in msgs:
101  peer_publish(msg)
102 
103  def timed_out(self):
104  """ Checks to see how much time has elapsed since the publisher was
105  created """
106  return time() - self.established_time > self.timeout
107 
108  def publish_override(self, message):
109  """ The publisher's publish method is replaced with this publish method
110  which checks for timeout and if we haven't timed out, buffers outgoing
111  messages in preparation for new subscriptions """
112  if not self.timed_out():
113  self.lock.acquire()
114  self.msg_buffer.append(message)
115  self.lock.release()
116  self.publish(message)
117 
118 
120  """ Keeps track of the clients that are using a particular publisher.
121 
122  Provides an API to publish messages and register clients that are using
123  this publisher """
124 
125  def __init__(self, topic, msg_type=None, latched_client_id=None, queue_size=100):
126  """ Register a publisher on the specified topic.
127 
128  Keyword arguments:
129  topic -- the name of the topic to register the publisher to
130  msg_type -- (optional) the type to register the publisher as. If not
131  provided, an attempt will be made to infer the topic type
132  latch -- (optional) if a client requested this publisher to be latched,
133  provide the client_id of that client here
134 
135  Throws:
136  TopicNotEstablishedException -- if no msg_type was specified by the
137  caller and the topic is not yet established, so a topic type cannot
138  be inferred
139  TypeConflictException -- if the msg_type was specified by the
140  caller and the topic is established, and the established type is
141  different to the user-specified msg_type
142 
143  """
144  # First check to see if the topic is already established
145  topic_type = get_topic_type(topic)[0]
146 
147  # If it's not established and no type was specified, exception
148  if msg_type is None and topic_type is None:
149  raise TopicNotEstablishedException(topic)
150 
151  # Use the established topic type if none was specified
152  if msg_type is None:
153  msg_type = topic_type
154 
155  # Load the message class, propagating any exceptions from bad msg types
156  msg_class = ros_loader.get_message_class(msg_type)
157 
158  # Make sure the specified msg type and established msg type are same
159  if topic_type is not None and topic_type != msg_class._type:
160  raise TypeConflictException(topic, topic_type, msg_class._type)
161 
162  # Create the publisher and associated member variables
163  self.clients = {}
164  self.latched_client_id = latched_client_id
165  self.topic = topic
166  self.msg_class = msg_class
167  self.publisher = Publisher(topic, msg_class, latch=(latched_client_id!=None), queue_size=queue_size)
169  self.listener.attach(self.publisher)
170 
171  def unregister(self):
172  """ Unregisters the publisher and clears the clients """
173  self.publisher.unregister()
174  self.clients.clear()
175 
176  def verify_type(self, msg_type):
177  """ Verify that the publisher publishes messages of the specified type.
178 
179  Keyword arguments:
180  msg_type -- the type to check this publisher against
181 
182  Throws:
183  Exception -- if ros_loader cannot load the specified msg type
184  TypeConflictException -- if the msg_type is different than the type of
185  this publisher
186 
187  """
188  if not ros_loader.get_message_class(msg_type) is self.msg_class:
189  raise TypeConflictException(self.topic,
190  self.msg_class._type, msg_type)
191  return
192 
193  def publish(self, msg):
194  """ Publish a message using this publisher.
195 
196  Keyword arguments:
197  msg -- the dict (json) message to publish
198 
199  Throws:
200  Exception -- propagates exceptions from message conversion if the
201  provided msg does not properly conform to the message type of this
202  publisher
203 
204  """
205  # First, check the publisher consistency listener to see if it's done
206  if self.listener.attached and self.listener.timed_out():
207  self.listener.detach()
208 
209  # Create a message instance
210  inst = self.msg_class()
211 
212  # Populate the instance, propagating any exceptions that may be thrown
213  message_conversion.populate_instance(msg, inst)
214 
215  # Publish the message
216  self.publisher.publish(inst)
217 
218  def register_client(self, client_id):
219  """ Register the specified client as a client of this publisher.
220 
221  Keyword arguments:
222  client_id -- the ID of the client using the publisher
223 
224  """
225  self.clients[client_id] = True
226 
227  def unregister_client(self, client_id):
228  """ Unregister the specified client from this publisher.
229 
230  If the specified client_id is not a client of this publisher, nothing
231  happens.
232 
233  Keyword arguments:
234  client_id -- the ID of the client to remove
235 
236  """
237  if client_id in self.clients:
238  del self.clients[client_id]
239 
240  def has_clients(self):
241  """ Return true if there are clients to this publisher. """
242  return len(self.clients) != 0
243 
244 
246  """ The PublisherManager keeps track of ROS publishers
247 
248  It maintains a MultiPublisher instance for each registered topic
249 
250  When unregistering a client, if there are no more clients for a publisher,
251  then that publisher is unregistered from the ROS Master
252  """
253 
254  def __init__(self):
255  self._publishers = {}
257  self.unregister_timeout = 10.0
258 
259  def register(self, client_id, topic, msg_type=None, latch=False, queue_size=100):
260  """ Register a publisher on the specified topic.
261 
262  Publishers are shared between clients, so a single MultiPublisher
263  instance is created per topic, even if multiple clients register.
264 
265  Keyword arguments:
266  client_id -- the ID of the client making this request
267  topic -- the name of the topic to publish on
268  msg_type -- (optional) the type to publish
269  latch -- (optional) whether to make this publisher latched
270  queue_size -- (optional) rospy publisher queue_size to use
271 
272  Throws:
273  Exception -- exceptions are propagated from the MultiPublisher if
274  there is a problem loading the specified msg class or establishing
275  the publisher
276 
277  """
278  latched_client_id = client_id if latch else None
279  if not topic in self._publishers:
280  self._publishers[topic] = MultiPublisher(topic, msg_type, latched_client_id,
281  queue_size=queue_size)
282  elif latch and self._publishers[topic].latched_client_id != client_id:
283  logwarn("Client ID %s attempted to register topic [%s] as latched " +
284  "but this topic was previously registered.", client_id, topic)
285  logwarn("Only a single registered latched publisher is supported at the time")
286  elif not latch and self._publishers[topic].latched_client_id:
287  logwarn("New non-latched publisher registration for topic [%s] which is " +
288  "already registered as latched. but this topic was previously " +
289  "registered.", topic)
290  logwarn("Only a single registered latched publisher is supported at the time")
291 
292  if msg_type is not None:
293  self._publishers[topic].verify_type(msg_type)
294 
295  self._publishers[topic].register_client(client_id)
296 
297  def unregister(self, client_id, topic):
298  """ Unregister a client from the publisher for the given topic.
299  Will wait some time before actually unregistering, it is done in
300  _unregister_impl
301 
302  If there are no clients remaining for that publisher, then the
303  publisher is unregistered from the ROS Master
304 
305  Keyword arguments:
306  client_id -- the ID of the client making this request
307  topic -- the topic to unregister the publisher for
308 
309  """
310  if not topic in self._publishers:
311  return
312 
313  self._publishers[topic].unregister_client(client_id)
314  if topic in self.unregister_timers:
315  self.unregister_timers[topic].cancel()
316  del self.unregister_timers[topic]
317  self.unregister_timers[topic] = Timer(self.unregister_timeout, self._unregister_impl,
318  [topic])
319  self.unregister_timers[topic].start()
320 
321  def _unregister_impl(self, topic):
322  if not self._publishers[topic].has_clients():
323  self._publishers[topic].unregister()
324  del self._publishers[topic]
325  del self.unregister_timers[topic]
326 
327  def unregister_all(self, client_id):
328  """ Unregisters a client from all publishers that they are registered
329  to.
330 
331  Keyword arguments:
332  client_id -- the ID of the client making this request """
333  for topic in self._publishers.keys():
334  self.unregister(client_id, topic)
335 
336  def publish(self, client_id, topic, msg, latch=False, queue_size=100):
337  """ Publish a message on the given topic.
338 
339  Tries to create a publisher on the topic if one does not already exist.
340 
341  Keyword arguments:
342  client_id -- the ID of the client making this request
343  topic -- the topic to publish the message on
344  msg -- a JSON-like dict of fields and values
345  latch -- (optional) whether to make this publisher latched
346  queue_size -- (optional) rospy publisher queue_size to use
347 
348  Throws:
349  Exception -- a variety of exceptions are propagated. They can be
350  thrown if there is a problem setting up or getting the publisher,
351  or if the provided msg does not map to the msg class of the publisher.
352 
353  """
354  self.register(client_id, topic, latch=latch, queue_size=queue_size)
355 
356  self._publishers[topic].publish(msg)
357 
358 
359 manager = PublisherManager()
def peer_subscribe(self, topic_name, topic_publish, peer_publish)
Definition: publishers.py:87
def register(self, client_id, topic, msg_type=None, latch=False, queue_size=100)
Definition: publishers.py:259
def __init__(self, topic, msg_type=None, latched_client_id=None, queue_size=100)
Definition: publishers.py:125
def publish(self, client_id, topic, msg, latch=False, queue_size=100)
Definition: publishers.py:336


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri May 10 2019 02:17:02