publishers.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2012, Willow Garage, Inc.
4 # Copyright (c) 2014, Creativa 77 SRL
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 from time import time
35 from copy import copy
36 from threading import Lock, Timer
37 from rospy import Publisher, SubscribeListener
38 from rospy import logwarn
39 from rostopic import get_topic_type
40 from rosbridge_library.internal import ros_loader, message_conversion
41 from rosbridge_library.internal.topics import TopicNotEstablishedException, TypeConflictException
42 
43 
44 class PublisherConsistencyListener(SubscribeListener):
45  """ This class is used to solve the problem that sometimes we create a
46  publisher and then immediately publish a message, before the subscribers
47  have set up their connections.
48 
49  Call attach() to attach the listener to a publisher. It sets up a buffer
50  of outgoing messages, then when a new connection occurs, sends the messages
51  in the buffer.
52 
53  Call detach() to detach the listener from the publisher and restore the
54  original publish methods.
55 
56  After some particular timeout (default to 1 second), the listener stops
57  buffering messages as it is assumed by this point all subscribers will have
58  successfully set up their connections."""
59 
60  timeout = 1 # Timeout in seconds to wait for new subscribers
61  attached = False
62 
63  def attach(self, publisher):
64  """ Overrides the publisher's publish method, and attaches a subscribe
65  listener to the publisher, effectively routing incoming connections
66  and outgoing publish requests through this class instance """
67  # Do the attaching
68  self.publisher = publisher
69  publisher.impl.add_subscriber_listener(self)
70  self.publish = publisher.publish
71  publisher.publish = self.publish_override
72 
73  # Set state variables
74  self.lock = Lock()
75  self.established_time = time()
76  self.msg_buffer = []
77  self.attached = True
78 
79  def detach(self):
80  """ Restores the publisher's original publish method and unhooks the
81  subscribe listeners, effectively finishing with this object """
82  self.publisher.publish = self.publish
83  if self in self.publisher.impl.subscriber_listeners:
84  self.publisher.impl.subscriber_listeners.remove(self)
85  self.attached = False
86 
87  def peer_subscribe(self, topic_name, topic_publish, peer_publish):
88  """ Called whenever there's a new subscription.
89 
90  If we're still inside the subscription setup window, then we publish
91  any buffered messages to the peer.
92 
93  We also check if we're timed out, but if we are we don't detach (due
94  to threading complications), we just don't propagate buffered messages
95  """
96  if not self.timed_out():
97  with self.lock:
98  msgs = copy(self.msg_buffer)
99  for msg in msgs:
100  peer_publish(msg)
101 
102  def timed_out(self):
103  """ Checks to see how much time has elapsed since the publisher was
104  created """
105  return time() - self.established_time > self.timeout
106 
107  def publish_override(self, message):
108  """ The publisher's publish method is replaced with this publish method
109  which checks for timeout and if we haven't timed out, buffers outgoing
110  messages in preparation for new subscriptions """
111  if not self.timed_out():
112  with self.lock:
113  self.msg_buffer.append(message)
114  self.publish(message)
115 
116 
118  """ Keeps track of the clients that are using a particular publisher.
119 
120  Provides an API to publish messages and register clients that are using
121  this publisher """
122 
123  def __init__(self, topic, msg_type=None, latched_client_id=None, queue_size=100):
124  """ Register a publisher on the specified topic.
125 
126  Keyword arguments:
127  topic -- the name of the topic to register the publisher to
128  msg_type -- (optional) the type to register the publisher as. If not
129  provided, an attempt will be made to infer the topic type
130  latch -- (optional) if a client requested this publisher to be latched,
131  provide the client_id of that client here
132 
133  Throws:
134  TopicNotEstablishedException -- if no msg_type was specified by the
135  caller and the topic is not yet established, so a topic type cannot
136  be inferred
137  TypeConflictException -- if the msg_type was specified by the
138  caller and the topic is established, and the established type is
139  different to the user-specified msg_type
140 
141  """
142  # First check to see if the topic is already established
143  topic_type = get_topic_type(topic)[0]
144 
145  # If it's not established and no type was specified, exception
146  if msg_type is None and topic_type is None:
147  raise TopicNotEstablishedException(topic)
148 
149  # Use the established topic type if none was specified
150  if msg_type is None:
151  msg_type = topic_type
152 
153  # Load the message class, propagating any exceptions from bad msg types
154  msg_class = ros_loader.get_message_class(msg_type)
155 
156  # Make sure the specified msg type and established msg type are same
157  if topic_type is not None and topic_type != msg_class._type:
158  raise TypeConflictException(topic, topic_type, msg_class._type)
159 
160  # Create the publisher and associated member variables
161  self.clients = {}
162  self.latched_client_id = latched_client_id
163  self.topic = topic
164  self.msg_class = msg_class
165  self.publisher = Publisher(topic, msg_class, latch=(latched_client_id!=None), queue_size=queue_size)
167  self.listener.attach(self.publisher)
168 
169  def unregister(self):
170  """ Unregisters the publisher and clears the clients """
171  self.publisher.unregister()
172  self.clients.clear()
173 
174  def verify_type(self, msg_type):
175  """ Verify that the publisher publishes messages of the specified type.
176 
177  Keyword arguments:
178  msg_type -- the type to check this publisher against
179 
180  Throws:
181  Exception -- if ros_loader cannot load the specified msg type
182  TypeConflictException -- if the msg_type is different than the type of
183  this publisher
184 
185  """
186  if not ros_loader.get_message_class(msg_type) is self.msg_class:
187  raise TypeConflictException(self.topic,
188  self.msg_class._type, msg_type)
189  return
190 
191  def publish(self, msg):
192  """ Publish a message using this publisher.
193 
194  Keyword arguments:
195  msg -- the dict (json) message to publish
196 
197  Throws:
198  Exception -- propagates exceptions from message conversion if the
199  provided msg does not properly conform to the message type of this
200  publisher
201 
202  """
203  # First, check the publisher consistency listener to see if it's done
204  if self.listener.attached and self.listener.timed_out():
205  self.listener.detach()
206 
207  # Create a message instance
208  inst = self.msg_class()
209 
210  # Populate the instance, propagating any exceptions that may be thrown
211  message_conversion.populate_instance(msg, inst)
212 
213  # Publish the message
214  self.publisher.publish(inst)
215 
216  def register_client(self, client_id):
217  """ Register the specified client as a client of this publisher.
218 
219  Keyword arguments:
220  client_id -- the ID of the client using the publisher
221 
222  """
223  self.clients[client_id] = True
224 
225  def unregister_client(self, client_id):
226  """ Unregister the specified client from this publisher.
227 
228  If the specified client_id is not a client of this publisher, nothing
229  happens.
230 
231  Keyword arguments:
232  client_id -- the ID of the client to remove
233 
234  """
235  if client_id in self.clients:
236  del self.clients[client_id]
237 
238  def has_clients(self):
239  """ Return true if there are clients to this publisher. """
240  return len(self.clients) != 0
241 
242 
244  """ The PublisherManager keeps track of ROS publishers
245 
246  It maintains a MultiPublisher instance for each registered topic
247 
248  When unregistering a client, if there are no more clients for a publisher,
249  then that publisher is unregistered from the ROS Master
250  """
251 
252  def __init__(self):
253  self._publishers = {}
255  self.unregister_timeout = 10.0
256 
257  def register(self, client_id, topic, msg_type=None, latch=False, queue_size=100):
258  """ Register a publisher on the specified topic.
259 
260  Publishers are shared between clients, so a single MultiPublisher
261  instance is created per topic, even if multiple clients register.
262 
263  Keyword arguments:
264  client_id -- the ID of the client making this request
265  topic -- the name of the topic to publish on
266  msg_type -- (optional) the type to publish
267  latch -- (optional) whether to make this publisher latched
268  queue_size -- (optional) rospy publisher queue_size to use
269 
270  Throws:
271  Exception -- exceptions are propagated from the MultiPublisher if
272  there is a problem loading the specified msg class or establishing
273  the publisher
274 
275  """
276  latched_client_id = client_id if latch else None
277  if not topic in self._publishers:
278  self._publishers[topic] = MultiPublisher(topic, msg_type, latched_client_id,
279  queue_size=queue_size)
280  elif latch and self._publishers[topic].latched_client_id != client_id:
281  logwarn("Client ID %s attempted to register topic [%s] as latched " +
282  "but this topic was previously registered.", client_id, topic)
283  logwarn("Only a single registered latched publisher is supported at the time")
284  elif not latch and self._publishers[topic].latched_client_id:
285  logwarn("New non-latched publisher registration for topic [%s] which is " +
286  "already registered as latched. but this topic was previously " +
287  "registered.", topic)
288  logwarn("Only a single registered latched publisher is supported at the time")
289 
290  if msg_type is not None:
291  self._publishers[topic].verify_type(msg_type)
292 
293  self._publishers[topic].register_client(client_id)
294 
295  def unregister(self, client_id, topic):
296  """ Unregister a client from the publisher for the given topic.
297  Will wait some time before actually unregistering, it is done in
298  _unregister_impl
299 
300  If there are no clients remaining for that publisher, then the
301  publisher is unregistered from the ROS Master
302 
303  Keyword arguments:
304  client_id -- the ID of the client making this request
305  topic -- the topic to unregister the publisher for
306 
307  """
308  if not topic in self._publishers:
309  return
310 
311  self._publishers[topic].unregister_client(client_id)
312  if topic in self.unregister_timers:
313  self.unregister_timers[topic].cancel()
314  del self.unregister_timers[topic]
315  self.unregister_timers[topic] = Timer(self.unregister_timeout, self._unregister_impl,
316  [topic])
317  self.unregister_timers[topic].start()
318 
319  def _unregister_impl(self, topic):
320  if not self._publishers[topic].has_clients():
321  self._publishers[topic].unregister()
322  del self._publishers[topic]
323  del self.unregister_timers[topic]
324 
325  def unregister_all(self, client_id):
326  """ Unregisters a client from all publishers that they are registered
327  to.
328 
329  Keyword arguments:
330  client_id -- the ID of the client making this request """
331  for topic in self._publishers.keys():
332  self.unregister(client_id, topic)
333 
334  def publish(self, client_id, topic, msg, latch=False, queue_size=100):
335  """ Publish a message on the given topic.
336 
337  Tries to create a publisher on the topic if one does not already exist.
338 
339  Keyword arguments:
340  client_id -- the ID of the client making this request
341  topic -- the topic to publish the message on
342  msg -- a JSON-like dict of fields and values
343  latch -- (optional) whether to make this publisher latched
344  queue_size -- (optional) rospy publisher queue_size to use
345 
346  Throws:
347  Exception -- a variety of exceptions are propagated. They can be
348  thrown if there is a problem setting up or getting the publisher,
349  or if the provided msg does not map to the msg class of the publisher.
350 
351  """
352  self.register(client_id, topic, latch=latch, queue_size=queue_size)
353 
354  self._publishers[topic].publish(msg)
355 
356 
357 manager = PublisherManager()
def peer_subscribe(self, topic_name, topic_publish, peer_publish)
Definition: publishers.py:87
def register(self, client_id, topic, msg_type=None, latch=False, queue_size=100)
Definition: publishers.py:257
def __init__(self, topic, msg_type=None, latched_client_id=None, queue_size=100)
Definition: publishers.py:123
def publish(self, client_id, topic, msg, latch=False, queue_size=100)
Definition: publishers.py:334


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Wed Jun 3 2020 03:55:14