1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 rospy implementation of topics.
37
38 Client API
39 ==========
40
41 L{Publisher} and L{Subscriber} are the client API for topics.
42
43 Internal Implementation
44 =======================
45
46 Internally, L{_TopicImpl} instances (_PublisherImpl/_SubscriberImpl)
47 are used to manage actual transport connections. The L{_TopicManager}
48 is responsible for tracking the system-wide state of publications and
49 subscriptions as well as the L{_TopicImpl} instances. More info is below.
50
51 L{_TopicManager}
52 ================
53
54 The L{_TopicManager} does the backend topic bookkeeping for the local
55 node. Use L{get_topic_manager()} to access singleton. Actual topic
56 implementations are done through the
57 L{_TopicImpl}/L{_PublisherImpl}/L{_SubscriberImpl} hierarchy. Client
58 code generates instances of type L{Publisher}/L{Subscriber}, which
59 enable to client to create multiple publishers/subscribers of that
60 topic that get controlled access to the underlying share connections.
61
62 Common parent classes for all rospy topics. The rospy topic autogenerators
63 create classes that are children of these implementations.
64 """
65
66
67 import struct
68 try:
69 from cStringIO import StringIO
70 import thread as _thread
71 python3 = 0
73 return isinstance(s, basestring)
74 except ImportError:
75 python3 = 1
76 from io import StringIO, BytesIO
77 import _thread
79 return isinstance(s, str)
80
81 import threading
82 import logging
83 import time
84
85 from itertools import chain
86 import traceback
87
88 import roslib.message
89
90 from rospy.core import *
91 from rospy.exceptions import ROSSerializationException, TransportTerminated
92 from rospy.msg import serialize_message, args_kwds_to_message
93
94 from rospy.impl.registration import get_topic_manager, set_topic_manager, Registration, get_registration_listeners
95 from rospy.impl.tcpros import get_tcpros_handler, DEFAULT_BUFF_SIZE
96 from rospy.impl.transport import DeadTransport
97
98 _logger = logging.getLogger('rospy.topics')
99
100
101 Message = roslib.message.Message
102
103
104
105
106
107
108
109
111 """Base class of L{Publisher} and L{Subscriber}"""
112
113 - def __init__(self, name, data_class, reg_type):
114 """
115 @param name: graph resource name of topic, e.g. 'laser'.
116 @type name: str
117 @param data_class: message class for serialization
118 @type data_class: L{Message}
119 @param reg_type Registration.PUB or Registration.SUB
120 @type reg_type: str
121 @raise ValueError: if parameters are invalid
122 """
123
124 if not name or not isstring(name):
125 raise ValueError("topic name is not a non-empty string")
126 try:
127 if python3 == 1:
128 name.encode("utf-8")
129 else:
130 name = name.encode("utf-8")
131 except UnicodeError:
132 raise ValueError("topic name must be ascii/utf-8 compatible")
133 if data_class is None:
134 raise ValueError("topic parameter 'data_class' is not initialized")
135 if not type(data_class) == type:
136 raise ValueError("data_class [%s] is not a class"%data_class)
137 if not issubclass(data_class, roslib.message.Message):
138 raise ValueError("data_class [%s] is not a message data class"%data_class.__class__.__name__)
139
140 if not roslib.names.is_legal_name(name):
141 import warnings
142 warnings.warn("'%s' is not a legal ROS graph resource name. This may cause problems with other ROS tools"%name, stacklevel=2)
143
144
145
146 if not rospy.core.is_initialized():
147 self.resolved_name = rospy.names.resolve_name_without_node_name(name)
148 else:
149
150 self.resolved_name = resolve_name(name)
151
152 self.name = self.resolved_name
153
154 self.data_class = data_class
155 self.type = data_class._type
156 self.md5sum = data_class._md5sum
157 self.reg_type = reg_type
158 self.impl = get_topic_manager().acquire_impl(reg_type, self.resolved_name, data_class)
159
161 """
162 get the number of connections to other ROS nodes for this topic. For a Publisher,
163 this corresponds to the number of nodes subscribing. For a Subscriber, the number
164 of publishers.
165 @return: number of connections
166 @rtype: int
167 """
168 return self.impl.get_num_connections()
169
171 """
172 unpublish/unsubscribe from topic. Topic instance is no longer
173 valid after this call. Additional calls to unregister() have no effect.
174 """
175
176
177 resolved_name = self.resolved_name
178 if resolved_name and self.impl:
179 get_topic_manager().release_impl(self.reg_type, resolved_name)
180 self.impl = self.resolved_name = self.type = self.md5sum = self.data_class = None
181
183 """
184 Base class of internal topic implementations. Each topic has a
185 singleton _TopicImpl implementation for managing the underlying
186 connections.
187 """
188
190 """
191 Base constructor
192 @param name: graph resource name of topic, e.g. 'laser'.
193 @type name: str
194 @param data_class: message data class
195 @type data_class: L{Message}
196 """
197
198
199 self.resolved_name = resolve_name(name)
200 self.name = self.resolved_name
201
202 self.data_class = data_class
203 self.type = data_class._type
204 self.handler = None
205 self.seq = 0
206
207
208
209
210
211
212
213 self.c_lock = threading.RLock()
214 self.connections = []
215 self.closed = False
216
217 self.ref_count = 0
218
219 self.dead_connections = []
220
222
223 if self.closed:
224 return
225 if self.connections is not None:
226 for c in self.connections:
227 try:
228 c.close()
229 except:
230 pass
231 del self.connections[:]
232 del self.dead_connections[:]
233 self.c_lock = self.dead_connections = self.connections = self.handler = self.data_class = self.type = None
234
236 """close I/O"""
237 if self.closed:
238 return
239 self.closed = True
240 if self.c_lock is not None:
241 with self.c_lock:
242 for c in self.connections:
243 try:
244 if c is not None:
245 c.close()
246 except:
247
248 _logger.error(traceback.format_exc())
249 del self.connections[:]
250 self.handler = None
251
252
253
254
255
256
258 with self.c_lock:
259 return len(self.connections)
260
262 """
263 Query whether or not a connection with the associated \a
264 endpoint has been added to this object.
265 @param endpoint_id: endpoint ID associated with connection.
266 @type endpoint_id: str
267 """
268
269 conn = self.connections
270 for c in conn:
271 if c.endpoint_id == endpoint_id:
272 return True
273 return False
274
276 """
277 Check to see if this topic is connected to other publishers/subscribers
278 @return: True if topic is connected
279 @rtype: bool
280 """
281 if self.connections:
282 return True
283 return False
284
286 """
287 Add a connection to this topic.
288 @param c: connection instance
289 @type c: Transport
290 @return: True if connection was added
291 @rtype: bool
292 """
293 with self.c_lock:
294
295
296
297 new_connections = self.connections[:]
298 new_connections.append(c)
299 self.connections = new_connections
300
301
302 c.set_cleanup_callback(self.remove_connection)
303
304 return True
305
307 """
308 Remove connection from topic.
309 @param c: connection instance to remove
310 @type c: Transport
311 """
312 with self.c_lock:
313
314
315
316 new_connections = self.connections[:]
317 new_dead_connections = self.dead_connections[:]
318 if c in new_connections:
319 new_connections.remove(c)
320 new_dead_connections.append(DeadTransport(c))
321 self.connections = new_connections
322 self.dead_connections = new_dead_connections
323
325 """
326 Get the stats for this topic
327 @return: stats for topic in getBusInfo() format::
328 ((connection_id, destination_caller_id, direction, transport, topic_name, connected)*)
329 @rtype: list
330 """
331
332 connections = self.connections
333 dead_connections = self.dead_connections
334 return [(c.id, c.endpoint_id, c.direction, c.transport_type, self.resolved_name, True) for c in connections] + \
335 [(c.id, c.endpoint_id, c.direction, c.transport_type, self.resolved_name, False) for c in dead_connections]
336
338 """Get the stats for this topic (API stub)"""
339 raise Exception("subclasses must override")
340
341
342
343
344
346 """
347 Class for registering as a subscriber to a specified topic, where
348 the messages are of a given type.
349 """
350 - def __init__(self, name, data_class, callback=None, callback_args=None,
351 queue_size=None, buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False):
352 """
353 Constructor.
354
355 NOTE: for the queue_size and buff_size
356 parameters, rospy does not attempt to do intelligent merging
357 between multiple Subscriber instances for the same topic. As
358 they share the same underlying transport, multiple Subscribers
359 to the same topic can conflict with one another if they set
360 these parameters differently.
361
362 @param name: graph resource name of topic, e.g. 'laser'.
363 @type name: str
364 @param data_class: data type class to use for messages,
365 e.g. std_msgs.msg.String
366 @type data_class: L{Message} class
367 @param callback: function to call ( fn(data)) when data is
368 received. If callback_args is set, the function must accept
369 the callback_args as a second argument, i.e. fn(data,
370 callback_args). NOTE: Additional callbacks can be added using
371 add_callback().
372 @type callback: str
373 @param callback_args: additional arguments to pass to the
374 callback. This is useful when you wish to reuse the same
375 callback for multiple subscriptions.
376 @type callback_args: any
377 @param queue_size: maximum number of messages to receive at
378 a time. This will generally be 1 or None (infinite,
379 default). buff_size should be increased if this parameter
380 is set as incoming data still needs to sit in the incoming
381 buffer before being discarded. Setting queue_size
382 buff_size to a non-default value affects all subscribers to
383 this topic in this process.
384 @type queue_size: int
385 @param buff_size: incoming message buffer size in bytes. If
386 queue_size is set, this should be set to a number greater
387 than the queue_size times the average message size. Setting
388 buff_size to a non-default value affects all subscribers to
389 this topic in this process.
390 @type buff_size: int
391 @param tcp_nodelay: if True, request TCP_NODELAY from
392 publisher. Use of this option is not generally recommended
393 in most cases as it is better to rely on timestamps in
394 message data. Setting tcp_nodelay to True enables TCP_NODELAY
395 for all subscribers in the same python process.
396 @type tcp_nodelay: bool
397 @raise ROSException: if parameters are invalid
398 """
399 super(Subscriber, self).__init__(name, data_class, Registration.SUB)
400
401
402
403
404 if queue_size is not None:
405 self.impl.set_queue_size(queue_size)
406 if buff_size != DEFAULT_BUFF_SIZE:
407 self.impl.set_buff_size(buff_size)
408
409 if callback is not None:
410
411
412
413 self.impl.add_callback(callback, callback_args)
414
415 self.callback = callback
416 self.callback_args = callback_args
417 else:
418
419 self.callback = self.callback_args = None
420 if tcp_nodelay:
421 self.impl.set_tcp_nodelay(tcp_nodelay)
422
424 """
425 unpublish/unsubscribe from topic. Topic instance is no longer
426 valid after this call. Additional calls to unregister() have no effect.
427 """
428 if self.impl:
429
430
431 if self.callback is not None:
432 self.impl.remove_callback(self.callback, self.callback_args)
433 self.callback = self.callback_args = None
434 super(Subscriber, self).unregister()
435
437 """
438 Underyling L{_TopicImpl} implementation for subscriptions.
439 """
441 """
442 ctor.
443 @param name: graph resource name of topic, e.g. 'laser'.
444 @type name: str
445 @param data_class: Message data class
446 @type data_class: L{Message} class
447 """
448 super(_SubscriberImpl, self).__init__(name, data_class)
449
450
451
452 self.callbacks = []
453 self.queue_size = None
454 self.buff_size = DEFAULT_BUFF_SIZE
455 self.tcp_nodelay = False
456
458 """close I/O and release resources"""
459 _TopicImpl.close(self)
460 if self.callbacks:
461 del self.callbacks[:]
462 self.callbacks = None
463
465 """
466 Set the value of TCP_NODELAY, which causes the Nagle algorithm
467 to be disabled for future topic connections, if the publisher
468 supports it.
469 """
470 self.tcp_nodelay = tcp_nodelay
471
473 """
474 Set the receive queue size. If more than queue_size messages
475 are waiting to be deserialized, they are discarded.
476
477 @param queue_size int: incoming queue size. Must be positive integer or None.
478 @type queue_size: int
479 """
480 if queue_size == -1:
481 self.queue_size = None
482 elif queue_size == 0:
483 raise ROSException("queue size may not be set to zero")
484 elif queue_size is not None and type(queue_size) != int:
485 raise ROSException("queue size must be an integer")
486 else:
487 self.queue_size = queue_size
488
490 """
491 Set the receive buffer size. The exact meaning of this is
492 transport dependent.
493 @param buff_size: receive buffer size
494 @type buff_size: int
495 """
496 if type(buff_size) != int:
497 raise ROSException("buffer size must be an integer")
498 elif buff_size <= 0:
499 raise ROSException("buffer size must be a positive integer")
500 self.buff_size = buff_size
501
503 """
504 Get the stats for this topic subscriber
505 @return: stats for topic in getBusStats() publisher format::
506 (topicName, connStats)
507 where connStats is::
508 [connectionId, bytesReceived, numSent, dropEstimate, connected]*
509 @rtype: list
510 """
511
512 conn = self.connections
513 dead_conn = self.dead_connections
514
515 stats = (self.resolved_name,
516 [(c.id, c.stat_bytes, c.stat_num_msg, -1, not c.done)
517 for c in chain(conn, dead_conn)] )
518 return stats
519
521 """
522 Register a callback to be invoked whenever a new message is received
523 @param cb: callback function to invoke with message data
524 instance, i.e. fn(data). If callback args is set, they will
525 be passed in as the second argument.
526 @type cb: fn(msg)
527 @param cb_cargs: additional arguments to pass to callback
528 @type cb_cargs: Any
529 """
530 if self.closed:
531 raise ROSException("subscriber [%s] has been closed"%(self.resolved_name))
532 with self.c_lock:
533
534
535 new_callbacks = self.callbacks[:]
536 new_callbacks.append((cb, cb_args))
537 self.callbacks = new_callbacks
538
539
540 for c in self.connections:
541 if c.latch is not None:
542 self._invoke_callback(c.latch, cb, cb_args)
543
545 """
546 Unregister a message callback.
547 @param cb: callback function
548 @type cb: fn(msg)
549 @param cb_cargs: additional arguments associated with callback
550 @type cb_cargs: Any
551 @raise KeyError: if no matching callback
552 """
553 if self.closed:
554 return
555 with self.c_lock:
556
557
558 matches = [x for x in self.callbacks if x[0] == cb and x[1] == cb_args]
559 if matches:
560 new_callbacks = self.callbacks[:]
561
562 new_callbacks.remove(matches[0])
563 self.callbacks = new_callbacks
564 if not matches:
565 raise KeyError("no matching cb")
566
568 """
569 Invoke callback on msg. Traps and logs any exceptions raise by callback
570 @param msg: message data
571 @type msg: L{Message}
572 @param cb: callback
573 @type cb: fn(msg, cb_args)
574 @param cb_args: callback args or None
575 @type cb_args: Any
576 """
577 try:
578 if cb_args is not None:
579 cb(msg, cb_args)
580 else:
581 cb(msg)
582 except Exception as e:
583 if not is_shutdown():
584 logerr("bad callback: %s\n%s"%(cb, traceback.format_exc()))
585 else:
586 _logger.warn("during shutdown, bad callback: %s\n%s"%(cb, traceback.format_exc()))
587
589 """
590 Called by underlying connection transport for each new message received
591 @param msgs: message data
592 @type msgs: [L{Message}]
593 """
594
595 callbacks = self.callbacks
596 for msg in msgs:
597 for cb, cb_args in callbacks:
598 self._invoke_callback(msg, cb, cb_args)
599
601 """
602 Callback API to receive notifications when new subscribers
603 connect and disconnect.
604 """
605
607 """
608 callback when a peer has subscribed from a topic
609 @param topic_name: topic name. NOTE: topic name will be resolved/remapped
610 @type topic_name: str
611 @param topic_publish: method to publish message data to all subscribers
612 @type topic_publish: fn(data)
613 @param peer_publish: method to publish message data to
614 new subscriber. NOTE: behavior for the latter is
615 transport-dependent as some transports may be broadcast only.
616 @type peer_publish: fn(data)
617 """
618 pass
619
621 """
622 callback when a peer has unsubscribed from a topic
623 @param topic_name: topic name. NOTE: topic name will be resolved/remapped
624 @type topic_name: str
625 @param num_peers: number of remaining peers subscribed to topic
626 @type num_peers: int
627 """
628 pass
629
630
631
632
633
634
635
637 """
638 Class for registering as a publisher of a ROS topic.
639 """
640
641 - def __init__(self, name, data_class, subscriber_listener=None, tcp_nodelay=False, latch=False, headers=None):
642 """
643 Constructor
644 @param name: resource name of topic, e.g. 'laser'.
645 @type name: str
646 @param data_class: message class for serialization
647 @type data_class: L{Message} class
648 @param subscriber_listener: listener for
649 subscription events. May be None.
650 @type subscriber_listener: L{SubscribeListener}
651 @param tcp_nodelay: If True, sets TCP_NODELAY on
652 publisher's socket (disables Nagle algorithm). This results
653 in lower latency publishing at the cost of efficiency.
654 @type tcp_nodelay: bool
655 @param latch: If True, the last message published is
656 'latched', meaning that any future subscribers will be sent
657 that message immediately upon connection.
658 @type latch: bool
659 @raise ROSException: if parameters are invalid
660 """
661 super(Publisher, self).__init__(name, data_class, Registration.PUB)
662
663 if subscriber_listener:
664 self.impl.add_subscriber_listener(subscriber_listener)
665 if tcp_nodelay:
666 get_tcpros_handler().set_tcp_nodelay(self.resolved_name, tcp_nodelay)
667 if latch:
668 self.impl.enable_latch()
669 if headers:
670 self.impl.add_headers(headers)
671
673 """
674 Publish message data object to this topic.
675 Publish can either be called with the message instance to
676 publish or with the constructor args for a new Message
677 instance, i.e.::
678 pub.publish(message_instance)
679 pub.publish(message_field_1, message_field_2...)
680 pub.publish(message_field_1='foo', message_field_2='bar')
681
682 @param args : L{Message} instance, message arguments, or no args if keyword arguments are used
683 @param kwds : Message keyword arguments. If kwds are used, args must be unset
684 @raise ROSException: If rospy node has not been initialized
685 @raise ROSSerializationException: If unable to serialize
686 message. This is usually a type error with one of the fields.
687 """
688 if self.impl is None:
689 raise ROSException("publish() to an unregistered() handle")
690 if not is_initialized():
691 raise ROSException("ROS node has not been initialized yet. Please call init_node() first")
692 data = args_kwds_to_message(self.data_class, args, kwds)
693 try:
694 self.impl.acquire()
695 self.impl.publish(data)
696 except roslib.message.SerializationError as e:
697
698 _logger.error(traceback.format_exc(e))
699 raise ROSSerializationException(str(e))
700 finally:
701 self.impl.release()
702
704 """
705 Underyling L{_TopicImpl} implementation for publishers.
706 """
707
709 """
710 @param name: name of topic, e.g. 'laser'.
711 @type name: str
712 @param data_class: Message data class
713 @type data_class: L{Message} class
714 """
715 super(_PublisherImpl, self).__init__(name, data_class)
716 if python3 == 0:
717 self.buff = StringIO()
718 else:
719 self.buff = BytesIO()
720 self.publock = threading.RLock()
721 self.subscriber_listeners = []
722
723
724 self.headers = {}
725
726
727 self.is_latch = False
728 self.latch = None
729
730
731 self.message_data_sent = 0
732
734 """close I/O and release resources"""
735 _TopicImpl.close(self)
736
737 if self.subscriber_listeners:
738 del self.subscriber_listeners[:]
739 if self.headers:
740 self.headers.clear()
741 if self.buff is not None:
742 self.buff.close()
743 self.publock = self.headers = self.buff = self.subscriber_listeners = None
744
746 """
747 Add connection headers to this Topic for future connections.
748 @param headers: key/values will be added to current connection
749 header set, overriding any existing keys if they conflict.
750 @type headers: dict
751 """
752 self.headers.update(headers)
753
755 """
756 Enable publish() latch. The latch contains the last published
757 message and is sent to any new subscribers.
758 """
759 self.is_latch = True
760
762 """
763 Get the stats for this topic publisher
764 @return: stats for topic in getBusStats() publisher format::
765 [topicName, messageDataBytes, connStats],
766 where connStats is::
767 [id, bytes, numMessages, connected]*
768 @rtype: list
769 """
770
771 conn = self.connections
772 dead_conn = self.dead_connections
773 return (self.resolved_name, self.message_data_sent,
774 [(c.id, c.stat_bytes, c.stat_num_msg, not c.done) for c in chain(conn, dead_conn)] )
775
777 """
778 Add a L{SubscribeListener} for subscribe events.
779 @param l: listener instance
780 @type l: L{SubscribeListener}
781 """
782 self.subscriber_listeners.append(l)
783
785 """lock for thread-safe publishing to this transport"""
786 if self.publock is not None:
787 self.publock.acquire()
788
790 """lock for thread-safe publishing to this transport"""
791 if self.publock is not None:
792 self.publock.release()
793
795 """
796 Add a connection to this topic. This must be a PubTransport. If
797 the latch is enabled, c will be sent a the value of the
798 latch.
799 @param c: connection instance
800 @type c: L{Transport}
801 @return: True if connection was added
802 @rtype: bool
803 """
804 super(_PublisherImpl, self).add_connection(c)
805 def publish_single(data):
806 self.publish(data, connection_override=c)
807 for l in self.subscriber_listeners:
808 l.peer_subscribe(self.resolved_name, self.publish, publish_single)
809 if self.is_latch and self.latch is not None:
810 with self.publock:
811 self.publish(self.latch, connection_override=c)
812 return True
813
815 """
816 Remove existing connection from this topic.
817 @param c: connection instance to remove
818 @type c: L{Transport}
819 """
820 super(_PublisherImpl, self).remove_connection(c)
821 num = len(self.connections)
822 for l in self.subscriber_listeners:
823 l.peer_unsubscribe(self.resolved_name, num)
824
825 - def publish(self, message, connection_override=None):
826 """
827 Publish the data to the topic. If the topic has no subscribers,
828 the method will return without any affect. Access to publish()
829 should be locked using acquire() and release() in order to
830 ensure proper message publish ordering.
831
832 @param message: message data instance to publish
833 @type message: L{Message}
834 @param connection_override: publish to this connection instead of all
835 @type connection_override: L{Transport}
836 @return: True if the data was published, False otherwise.
837 @rtype: bool
838 @raise roslib.message.SerializationError: if L{Message} instance is unable to serialize itself
839 @raise rospy.ROSException: if topic has been closed or was closed during publish()
840 """
841
842
843 if self.closed:
844
845
846 if not is_shutdown():
847 raise ROSException("publish() to a closed topic")
848 else:
849 return
850
851 if self.is_latch:
852 self.latch = message
853
854 if not self.has_connections():
855
856 return False
857
858 if connection_override is None:
859
860 conns = self.connections
861 else:
862 conns = [connection_override]
863
864
865
866 b = self.buff
867 try:
868 b.tell()
869
870
871 self.seq += 1
872 serialize_message(b, self.seq, message)
873
874
875 err_con = []
876 data = b.getvalue()
877
878 for c in conns:
879 try:
880 if not is_shutdown():
881 c.write_data(data)
882 except TransportTerminated as e:
883 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc()))
884 err_con.append(c)
885 except Exception as e:
886
887 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc()))
888 err_con.append(c)
889
890
891 self.message_data_sent += b.tell()
892 b.seek(0)
893 b.truncate(0)
894
895 except ValueError:
896
897
898
899 if self.closed:
900 if is_shutdown():
901
902
903 return
904 else:
905
906
907 raise ROSException("topic was closed during publish()")
908 else:
909
910 raise
911
912
913 for c in err_con:
914 try:
915
916
917 c.close()
918 except:
919 pass
920
921
922
923
925 """
926 Tracks Topic objects
927 See L{get_topic_manager()} for singleton access
928 """
929
931 """ctor."""
932 super(_TopicManager, self).__init__()
933 self.pubs = {}
934 self.subs = {}
935 self.topics = set()
936 self.lock = threading.Condition()
937 self.closed = False
938 _logger.info("topicmanager initialized")
939
941 """
942 get topic publisher and subscriber connection info for getBusInfo() api
943 @return: [bus info stats]
944 See getBusInfo() API for more data structure details.
945 @rtype: list
946 """
947 with self.lock:
948 info = []
949 for s in chain(iter(self.pubs.values()), iter(self.subs.values())):
950 info.extend(s.get_stats_info())
951 return info
952
954 """
955 get topic publisher and subscriber stats for getBusStats() api
956 @return: [publisherStats, subscriberStats].
957 See getBusStats() API for more data structure details.
958 @rtype: list
959 """
960 with self.lock:
961 return [s.get_stats() for s in self.pubs.values()],\
962 [s.get_stats() for s in self.subs.values()]
963
965 """
966 Close all registered publication and subscriptions. Manager is
967 no longer usable after close.
968 """
969 self.closed = True
970 with self.lock:
971 for t in chain(iter(self.pubs.values()), iter(self.subs.values())):
972 t.close()
973 self.pubs.clear()
974 self.subs.clear()
975
976 - def _add(self, ps, rmap, reg_type):
977 """
978 Add L{_TopicImpl} instance to rmap
979 @param ps: a pub/sub impl instance
980 @type ps: L{_TopicImpl}
981 @param rmap: { topic: _TopicImpl} rmap to record instance in
982 @type rmap: dict
983 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
984 @type reg_type: str
985 """
986 resolved_name = ps.resolved_name
987 _logger.debug("tm._add: %s, %s, %s", resolved_name, ps.type, reg_type)
988 with self.lock:
989 rmap[resolved_name] = ps
990 self.topics.add(resolved_name)
991
992
993
994 get_registration_listeners().notify_added(resolved_name, ps.type, reg_type)
995
997 """recalculate self.topics. expensive"""
998 self.topics = set([x.resolved_name for x in self.pubs.values()] +
999 [x.resolved_name for x in self.subs.values()])
1000
1001 - def _remove(self, ps, rmap, reg_type):
1002 """
1003 Remove L{_TopicImpl} instance from rmap
1004 @param ps: a pub/sub impl instance
1005 @type ps: L{_TopicImpl}
1006 @param rmap: topic->_TopicImpl rmap to remove instance in
1007 @type rmap: dict
1008 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1009 @type reg_type: str
1010 """
1011 resolved_name = ps.resolved_name
1012 _logger.debug("tm._remove: %s, %s, %s", resolved_name, ps.type, reg_type)
1013 with self.lock:
1014 del rmap[resolved_name]
1015 self. _recalculate_topics()
1016
1017
1018
1019 get_registration_listeners().notify_removed(resolved_name, ps.type, reg_type)
1020
1021 - def get_impl(self, reg_type, resolved_name):
1022 """
1023 Get the L{_TopicImpl} for the specified topic. This is mainly for
1024 testing purposes. Unlike acquire_impl, it does not alter the
1025 ref count.
1026 @param resolved_name: resolved topic name
1027 @type resolved_name: str
1028 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1029 @type reg_type: str
1030 """
1031 if reg_type == Registration.PUB:
1032 rmap = self.pubs
1033 elif reg_type == Registration.SUB:
1034 rmap = self.subs
1035 else:
1036 raise TypeError("invalid reg_type: %s"%s)
1037 return rmap.get(resolved_name, None)
1038
1039 - def acquire_impl(self, reg_type, resolved_name, data_class):
1040 """
1041 Acquire a L{_TopicImpl} for the specified topic (create one if it
1042 doesn't exist). Every L{Topic} instance has a _TopicImpl that
1043 actually controls the topic resources so that multiple Topic
1044 instances use the same underlying connections. 'Acquiring' a
1045 topic implementation marks that another Topic instance is
1046 using the TopicImpl.
1047
1048 @param resolved_name: resolved topic name
1049 @type resolved_name: str
1050
1051 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1052 @type reg_type: str
1053
1054 @param data_class: message class for topic
1055 @type data_class: L{Message} class
1056 """
1057 if reg_type == Registration.PUB:
1058 rmap = self.pubs
1059 impl_class = _PublisherImpl
1060 elif reg_type == Registration.SUB:
1061 rmap = self.subs
1062 impl_class = _SubscriberImpl
1063 else:
1064 raise TypeError("invalid reg_type: %s"%s)
1065 with self.lock:
1066 impl = rmap.get(resolved_name, None)
1067 if not impl:
1068 impl = impl_class(resolved_name, data_class)
1069 self._add(impl, rmap, reg_type)
1070 impl.ref_count += 1
1071 return impl
1072
1074 """
1075 Release a L_{TopicImpl} for the specified topic.
1076
1077 Every L{Topic} instance has a _TopicImpl that actually
1078 controls the topic resources so that multiple Topic instances
1079 use the same underlying connections. 'Acquiring' a topic
1080 implementation marks that another Topic instance is using the
1081 TopicImpl.
1082
1083 @param resolved_name: resolved topic name
1084 @type resolved_name: str
1085 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1086 @type reg_type: str
1087 """
1088 if reg_type == Registration.PUB:
1089 rmap = self.pubs
1090 else:
1091 rmap = self.subs
1092 with self.lock:
1093
1094 if self.closed:
1095 return
1096 impl = rmap.get(resolved_name, None)
1097 assert impl is not None, "cannot release topic impl as impl [%s] does not exist"%resolved_name
1098 impl.ref_count -= 1
1099 assert impl.ref_count >= 0, "topic impl's reference count has gone below zero"
1100 if impl.ref_count == 0:
1101 _logger.debug("topic impl's ref count is zero, deleting topic %s...", resolved_name)
1102 impl.close()
1103 self._remove(impl, rmap, reg_type)
1104 del impl
1105 _logger.debug("... done deleting topic %s", resolved_name)
1106
1108 """
1109 @param resolved_name: resolved topic name
1110 @type resolved_name: str
1111 @return: list of L{_PublisherImpl}s
1112 @rtype: [L{_PublisherImpl}]
1113 """
1114 return self.pubs.get(resolved_name, None)
1115
1117 """
1118 @param resolved_name: topic name
1119 @type resolved_name: str
1120 @return: subscriber for the specified topic.
1121 @rtype: L{_SubscriberImpl}
1122 """
1123 return self.subs.get(resolved_name, None)
1124
1126 """
1127 @param resolved_name: resolved topic name
1128 @type resolved_name: str
1129 @return: True if manager has subscription for specified topic
1130 @rtype: bool
1131 """
1132 return resolved_name in self.subs
1133
1135 """
1136 @param resolved_name: resolved topic name
1137 @type resolved_name: str
1138 @return: True if manager has publication for specified topic
1139 @rtype: bool
1140 """
1141 return resolved_name in self.pubs
1142
1144 """
1145 @return: list of topic names this node subscribes to/publishes
1146 @rtype: [str]
1147 """
1148 return self.topics
1149
1151 return [[k, v.type] for k, v in rmap.items()]
1152
1153
1155 return self._get_list(self.subs)
1156
1157
1159 return self._get_list(self.pubs)
1160
1161 set_topic_manager(_TopicManager())
1162