1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 rospy implementation of topics.
37
38 Client API
39 ==========
40
41 L{Publisher} and L{Subscriber} are the client API for topics.
42
43 Internal Implementation
44 =======================
45
46 Internally, L{_TopicImpl} instances (_PublisherImpl/_SubscriberImpl)
47 are used to manage actual transport connections. The L{_TopicManager}
48 is responsible for tracking the system-wide state of publications and
49 subscriptions as well as the L{_TopicImpl} instances. More info is below.
50
51 L{_TopicManager}
52 ================
53
54 The L{_TopicManager} does the backend topic bookkeeping for the local
55 node. Use L{get_topic_manager()} to access singleton. Actual topic
56 implementations are done through the
57 L{_TopicImpl}/L{_PublisherImpl}/L{_SubscriberImpl} hierarchy. Client
58 code generates instances of type L{Publisher}/L{Subscriber}, which
59 enable to client to create multiple publishers/subscribers of that
60 topic that get controlled access to the underlying share connections.
61
62 Common parent classes for all rospy topics. The rospy topic autogenerators
63 create classes that are children of these implementations.
64 """
65
66 from __future__ import with_statement
67 import struct, cStringIO, thread, threading, logging, time
68 from itertools import chain
69 import traceback
70
71 import roslib.message
72
73 from rospy.core import *
74 from rospy.exceptions import ROSSerializationException, TransportTerminated
75 from rospy.msg import serialize_message, args_kwds_to_message
76
77 from rospy.impl.registration import get_topic_manager, set_topic_manager, Registration, get_registration_listeners
78 from rospy.impl.tcpros import get_tcpros_handler, DEFAULT_BUFF_SIZE
79 from rospy.impl.transport import DeadTransport
80
81 _logger = logging.getLogger('rospy.topics')
82
83
84 Message = roslib.message.Message
85
86
87
88
89
90
91
92
94 """Base class of L{Publisher} and L{Subscriber}"""
95
96 - def __init__(self, name, data_class, reg_type):
97 """
98 @param name: graph resource name of topic, e.g. 'laser'.
99 @type name: str
100 @param data_class: message class for serialization
101 @type data_class: L{Message}
102 @param reg_type Registration.PUB or Registration.SUB
103 @type reg_type: str
104 @raise ValueError: if parameters are invalid
105 """
106
107 if not name or not isinstance(name, basestring):
108 raise ValueError("topic name is not a non-empty string")
109 if isinstance(name, unicode):
110 raise ValueError("topic name cannot be unicode")
111 if data_class is None:
112 raise ValueError("topic parameter 'data_class' is not initialized")
113 if not type(data_class) == type:
114 raise ValueError("data_class [%s] is not a class"%data_class)
115 if not issubclass(data_class, roslib.message.Message):
116 raise ValueError("data_class [%s] is not a message data class"%data_class.__class__.__name__)
117
118 if not roslib.names.is_legal_name(name):
119 import warnings
120 warnings.warn("'%s' is not a legal ROS graph resource name. This may cause problems with other ROS tools"%name, stacklevel=2)
121
122
123
124 if not rospy.core.is_initialized():
125 self.resolved_name = rospy.names.resolve_name_without_node_name(name)
126 else:
127
128 self.resolved_name = resolve_name(name)
129
130 self.name = self.resolved_name
131
132 self.data_class = data_class
133 self.type = data_class._type
134 self.md5sum = data_class._md5sum
135 self.reg_type = reg_type
136 self.impl = get_topic_manager().acquire_impl(reg_type, self.resolved_name, data_class)
137
139 """
140 get the number of connections to other ROS nodes for this topic. For a Publisher,
141 this corresponds to the number of nodes subscribing. For a Subscriber, the number
142 of publishers.
143 @return: number of connections
144 @rtype: int
145 """
146 return self.impl.get_num_connections()
147
149 """
150 unpublish/unsubscribe from topic. Topic instance is no longer
151 valid after this call. Additional calls to unregister() have no effect.
152 """
153
154
155 resolved_name = self.resolved_name
156 if resolved_name and self.impl:
157 get_topic_manager().release_impl(self.reg_type, resolved_name)
158 self.impl = self.resolved_name = self.type = self.md5sum = self.data_class = None
159
161 """
162 Base class of internal topic implementations. Each topic has a
163 singleton _TopicImpl implementation for managing the underlying
164 connections.
165 """
166
168 """
169 Base constructor
170 @param name: graph resource name of topic, e.g. 'laser'.
171 @type name: str
172 @param data_class: message data class
173 @type data_class: L{Message}
174 """
175
176
177 self.resolved_name = resolve_name(name)
178 self.name = self.resolved_name
179
180 self.data_class = data_class
181 self.type = data_class._type
182 self.handler = None
183 self.seq = 0
184
185
186
187
188
189
190
191 self.c_lock = threading.RLock()
192 self.connections = []
193 self.closed = False
194
195 self.ref_count = 0
196
197 self.dead_connections = []
198
200
201 if self.closed:
202 return
203 if self.connections is not None:
204 for c in self.connections:
205 try:
206 c.close()
207 except:
208 pass
209 del self.connections[:]
210 del self.dead_connections[:]
211 self.c_lock = self.dead_connections = self.connections = self.handler = self.data_class = self.type = None
212
214 """close I/O"""
215 if self.closed:
216 return
217 self.closed = True
218 if self.c_lock is not None:
219 with self.c_lock:
220 for c in self.connections:
221 try:
222 if c is not None:
223 c.close()
224 except:
225
226 _logger.error(traceback.format_exc())
227 del self.connections[:]
228 self.handler = None
229
230
231
232
233
234
236 with self.c_lock:
237 return len(self.connections)
238
240 """
241 Query whether or not a connection with the associated \a
242 endpoint has been added to this object.
243 @param endpoint_id: endpoint ID associated with connection.
244 @type endpoint_id: str
245 """
246
247 conn = self.connections
248 for c in conn:
249 if c.endpoint_id == endpoint_id:
250 return True
251 return False
252
254 """
255 Check to see if this topic is connected to other publishers/subscribers
256 @return: True if topic is connected
257 @rtype: bool
258 """
259 if self.connections:
260 return True
261 return False
262
264 """
265 Add a connection to this topic.
266 @param c: connection instance
267 @type c: Transport
268 @return: True if connection was added
269 @rtype: bool
270 """
271 with self.c_lock:
272
273
274
275 new_connections = self.connections[:]
276 new_connections.append(c)
277 self.connections = new_connections
278
279
280 c.set_cleanup_callback(self.remove_connection)
281
282 return True
283
285 """
286 Remove connection from topic.
287 @param c: connection instance to remove
288 @type c: Transport
289 """
290 with self.c_lock:
291
292
293
294 new_connections = self.connections[:]
295 new_dead_connections = self.dead_connections[:]
296 if c in new_connections:
297 new_connections.remove(c)
298 new_dead_connections.append(DeadTransport(c))
299 self.connections = new_connections
300 self.dead_connections = new_dead_connections
301
303 """
304 Get the stats for this topic
305 @return: stats for topic in getBusInfo() format::
306 ((connection_id, destination_caller_id, direction, transport, topic_name, connected)*)
307 @rtype: list
308 """
309
310 connections = self.connections
311 dead_connections = self.dead_connections
312 return [(c.id, c.endpoint_id, c.direction, c.transport_type, self.resolved_name, True) for c in connections] + \
313 [(c.id, c.endpoint_id, c.direction, c.transport_type, self.resolved_name, False) for c in dead_connections]
314
316 """Get the stats for this topic (API stub)"""
317 raise Exception("subclasses must override")
318
319
320
321
322
324 """
325 Class for registering as a subscriber to a specified topic, where
326 the messages are of a given type.
327 """
328 - def __init__(self, name, data_class, callback=None, callback_args=None,
329 queue_size=None, buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False):
330 """
331 Constructor.
332
333 NOTE: for the queue_size and buff_size
334 parameters, rospy does not attempt to do intelligent merging
335 between multiple Subscriber instances for the same topic. As
336 they share the same underlying transport, multiple Subscribers
337 to the same topic can conflict with one another if they set
338 these parameters differently.
339
340 @param name: graph resource name of topic, e.g. 'laser'.
341 @type name: str
342 @param data_class: data type class to use for messages,
343 e.g. std_msgs.msg.String
344 @type data_class: L{Message} class
345 @param callback: function to call ( fn(data)) when data is
346 received. If callback_args is set, the function must accept
347 the callback_args as a second argument, i.e. fn(data,
348 callback_args). NOTE: Additional callbacks can be added using
349 add_callback().
350 @type callback: str
351 @param callback_args: additional arguments to pass to the
352 callback. This is useful when you wish to reuse the same
353 callback for multiple subscriptions.
354 @type callback_args: any
355 @param queue_size: maximum number of messages to receive at
356 a time. This will generally be 1 or None (infinite,
357 default). buff_size should be increased if this parameter
358 is set as incoming data still needs to sit in the incoming
359 buffer before being discarded. Setting queue_size
360 buff_size to a non-default value affects all subscribers to
361 this topic in this process.
362 @type queue_size: int
363 @param buff_size: incoming message buffer size in bytes. If
364 queue_size is set, this should be set to a number greater
365 than the queue_size times the average message size. Setting
366 buff_size to a non-default value affects all subscribers to
367 this topic in this process.
368 @type buff_size: int
369 @param tcp_nodelay: if True, request TCP_NODELAY from
370 publisher. Use of this option is not generally recommended
371 in most cases as it is better to rely on timestamps in
372 message data. Setting tcp_nodelay to True enables TCP_NODELAY
373 for all subscribers in the same python process.
374 @type tcp_nodelay: bool
375 @raise ROSException: if parameters are invalid
376 """
377 super(Subscriber, self).__init__(name, data_class, Registration.SUB)
378
379
380
381
382 if queue_size is not None:
383 self.impl.set_queue_size(queue_size)
384 if buff_size != DEFAULT_BUFF_SIZE:
385 self.impl.set_buff_size(buff_size)
386
387 if callback is not None:
388
389
390
391 self.impl.add_callback(callback, callback_args)
392
393 self.callback = callback
394 self.callback_args = callback_args
395 else:
396
397 self.callback = self.callback_args = None
398 if tcp_nodelay:
399 self.impl.set_tcp_nodelay(tcp_nodelay)
400
402 """
403 unpublish/unsubscribe from topic. Topic instance is no longer
404 valid after this call. Additional calls to unregister() have no effect.
405 """
406 if self.impl:
407
408
409 if self.callback is not None:
410 self.impl.remove_callback(self.callback, self.callback_args)
411 self.callback = self.callback_args = None
412 super(Subscriber, self).unregister()
413
415 """
416 Underyling L{_TopicImpl} implementation for subscriptions.
417 """
418
420 """
421 ctor.
422 @param name: graph resource name of topic, e.g. 'laser'.
423 @type name: str
424 @param data_class: Message data class
425 @type data_class: L{Message} class
426 """
427 super(_SubscriberImpl, self).__init__(name, data_class)
428
429
430
431 self.callbacks = []
432 self.queue_size = None
433 self.buff_size = DEFAULT_BUFF_SIZE
434 self.tcp_nodelay = False
435
437 """close I/O and release resources"""
438 _TopicImpl.close(self)
439 if self.callbacks:
440 del self.callbacks[:]
441 self.callbacks = None
442
444 """
445 Set the value of TCP_NODELAY, which causes the Nagle algorithm
446 to be disabled for future topic connections, if the publisher
447 supports it.
448 """
449 self.tcp_nodelay = tcp_nodelay
450
452 """
453 Set the receive queue size. If more than queue_size messages
454 are waiting to be deserialized, they are discarded.
455
456 @param queue_size int: incoming queue size. Must be positive integer or None.
457 @type queue_size: int
458 """
459 if queue_size == -1:
460 self.queue_size = None
461 elif queue_size == 0:
462 raise ROSException("queue size may not be set to zero")
463 elif queue_size is not None and type(queue_size) != int:
464 raise ROSException("queue size must be an integer")
465 else:
466 self.queue_size = queue_size
467
469 """
470 Set the receive buffer size. The exact meaning of this is
471 transport dependent.
472 @param buff_size: receive buffer size
473 @type buff_size: int
474 """
475 if type(buff_size) != int:
476 raise ROSException("buffer size must be an integer")
477 elif buff_size <= 0:
478 raise ROSException("buffer size must be a positive integer")
479 self.buff_size = buff_size
480
482 """
483 Get the stats for this topic subscriber
484 @return: stats for topic in getBusStats() publisher format::
485 (topicName, connStats)
486 where connStats is::
487 [connectionId, bytesReceived, numSent, dropEstimate, connected]*
488 @rtype: list
489 """
490
491 conn = self.connections
492 dead_conn = self.dead_connections
493
494 stats = (self.resolved_name,
495 [(c.id, c.stat_bytes, c.stat_num_msg, -1, not c.done)
496 for c in chain(conn, dead_conn)] )
497 return stats
498
500 """
501 Register a callback to be invoked whenever a new message is received
502 @param cb: callback function to invoke with message data
503 instance, i.e. fn(data). If callback args is set, they will
504 be passed in as the second argument.
505 @type cb: fn(msg)
506 @param cb_cargs: additional arguments to pass to callback
507 @type cb_cargs: Any
508 """
509 if self.closed:
510 raise ROSException("subscriber [%s] has been closed"%(self.resolved_name))
511 with self.c_lock:
512
513
514 new_callbacks = self.callbacks[:]
515 new_callbacks.append((cb, cb_args))
516 self.callbacks = new_callbacks
517
518
519 for c in self.connections:
520 if c.latch is not None:
521 self._invoke_callback(c.latch, cb, cb_args)
522
524 """
525 Unregister a message callback.
526 @param cb: callback function
527 @type cb: fn(msg)
528 @param cb_cargs: additional arguments associated with callback
529 @type cb_cargs: Any
530 @raise KeyError: if no matching callback
531 """
532 if self.closed:
533 return
534 with self.c_lock:
535
536
537 matches = [x for x in self.callbacks if x[0] == cb and x[1] == cb_args]
538 if matches:
539 new_callbacks = self.callbacks[:]
540
541 new_callbacks.remove(matches[0])
542 self.callbacks = new_callbacks
543 if not matches:
544 raise KeyError("no matching cb")
545
547 """
548 Invoke callback on msg. Traps and logs any exceptions raise by callback
549 @param msg: message data
550 @type msg: L{Message}
551 @param cb: callback
552 @type cb: fn(msg, cb_args)
553 @param cb_args: callback args or None
554 @type cb_args: Any
555 """
556 try:
557 if cb_args is not None:
558 cb(msg, cb_args)
559 else:
560 cb(msg)
561 except Exception, e:
562 if not is_shutdown():
563 logerr("bad callback: %s\n%s"%(cb, traceback.format_exc()))
564 else:
565 _logger.warn("during shutdown, bad callback: %s\n%s"%(cb, traceback.format_exc()))
566
568 """
569 Called by underlying connection transport for each new message received
570 @param msgs: message data
571 @type msgs: [L{Message}]
572 """
573
574 callbacks = self.callbacks
575 for msg in msgs:
576 for cb, cb_args in callbacks:
577 self._invoke_callback(msg, cb, cb_args)
578
580 """
581 Callback API to receive notifications when new subscribers
582 connect and disconnect.
583 """
584
586 """
587 callback when a peer has subscribed from a topic
588 @param topic_name: topic name. NOTE: topic name will be resolved/remapped
589 @type topic_name: str
590 @param topic_publish: method to publish message data to all subscribers
591 @type topic_publish: fn(data)
592 @param peer_publish: method to publish message data to
593 new subscriber. NOTE: behavior for the latter is
594 transport-dependent as some transports may be broadcast only.
595 @type peer_publish: fn(data)
596 """
597 pass
598
600 """
601 callback when a peer has unsubscribed from a topic
602 @param topic_name: topic name. NOTE: topic name will be resolved/remapped
603 @type topic_name: str
604 @param num_peers: number of remaining peers subscribed to topic
605 @type num_peers: int
606 """
607 pass
608
609
610
611
612
613
614
616 """
617 Class for registering as a publisher of a ROS topic.
618 """
619
620 - def __init__(self, name, data_class, subscriber_listener=None, tcp_nodelay=False, latch=False, headers=None):
621 """
622 Constructor
623 @param name: resource name of topic, e.g. 'laser'.
624 @type name: str
625 @param data_class: message class for serialization
626 @type data_class: L{Message} class
627 @param subscriber_listener: listener for
628 subscription events. May be None.
629 @type subscriber_listener: L{SubscribeListener}
630 @param tcp_nodelay: If True, sets TCP_NODELAY on
631 publisher's socket (disables Nagle algorithm). This results
632 in lower latency publishing at the cost of efficiency.
633 @type tcp_nodelay: bool
634 @param latch: If True, the last message published is
635 'latched', meaning that any future subscribers will be sent
636 that message immediately upon connection.
637 @type latch: bool
638 @raise ROSException: if parameters are invalid
639 """
640 super(Publisher, self).__init__(name, data_class, Registration.PUB)
641
642 if subscriber_listener:
643 self.impl.add_subscriber_listener(subscriber_listener)
644 if tcp_nodelay:
645 get_tcpros_handler().set_tcp_nodelay(self.resolved_name, tcp_nodelay)
646 if latch:
647 self.impl.enable_latch()
648 if headers:
649 self.impl.add_headers(headers)
650
652 """
653 Publish message data object to this topic.
654 Publish can either be called with the message instance to
655 publish or with the constructor args for a new Message
656 instance, i.e.::
657 pub.publish(message_instance)
658 pub.publish(message_field_1, message_field_2...)
659 pub.publish(message_field_1='foo', message_field_2='bar')
660
661 @param args : L{Message} instance, message arguments, or no args if keyword arguments are used
662 @param kwds : Message keyword arguments. If kwds are used, args must be unset
663 @raise ROSException: If rospy node has not been initialized
664 @raise ROSSerializationException: If unable to serialize
665 message. This is usually a type error with one of the fields.
666 """
667 if self.impl is None:
668 raise ROSException("publish() to an unregistered() handle")
669 if not is_initialized():
670 raise ROSException("ROS node has not been initialized yet. Please call init_node() first")
671 data = args_kwds_to_message(self.data_class, args, kwds)
672 try:
673 self.impl.acquire()
674 self.impl.publish(data)
675 except roslib.message.SerializationError, e:
676
677 _logger.error(traceback.format_exc(e))
678 raise ROSSerializationException(str(e))
679 finally:
680 self.impl.release()
681
683 """
684 Underyling L{_TopicImpl} implementation for publishers.
685 """
686
688 """
689 @param name: name of topic, e.g. 'laser'.
690 @type name: str
691 @param data_class: Message data class
692 @type data_class: L{Message} class
693 """
694 super(_PublisherImpl, self).__init__(name, data_class)
695 self.buff = cStringIO.StringIO()
696 self.publock = threading.RLock()
697 self.subscriber_listeners = []
698
699
700 self.headers = {}
701
702
703 self.is_latch = False
704 self.latch = None
705
706
707 self.message_data_sent = 0
708
710 """close I/O and release resources"""
711 _TopicImpl.close(self)
712
713 if self.subscriber_listeners:
714 del self.subscriber_listeners[:]
715 if self.headers:
716 self.headers.clear()
717 if self.buff is not None:
718 self.buff.close()
719 self.publock = self.headers = self.buff = self.subscriber_listeners = None
720
722 """
723 Add connection headers to this Topic for future connections.
724 @param headers: key/values will be added to current connection
725 header set, overriding any existing keys if they conflict.
726 @type headers: dict
727 """
728 self.headers.update(headers)
729
731 """
732 Enable publish() latch. The latch contains the last published
733 message and is sent to any new subscribers.
734 """
735 self.is_latch = True
736
738 """
739 Get the stats for this topic publisher
740 @return: stats for topic in getBusStats() publisher format::
741 [topicName, messageDataBytes, connStats],
742 where connStats is::
743 [id, bytes, numMessages, connected]*
744 @rtype: list
745 """
746
747 conn = self.connections
748 dead_conn = self.dead_connections
749 return (self.resolved_name, self.message_data_sent,
750 [(c.id, c.stat_bytes, c.stat_num_msg, not c.done) for c in chain(conn, dead_conn)] )
751
753 """
754 Add a L{SubscribeListener} for subscribe events.
755 @param l: listener instance
756 @type l: L{SubscribeListener}
757 """
758 self.subscriber_listeners.append(l)
759
761 """lock for thread-safe publishing to this transport"""
762 if self.publock is not None:
763 self.publock.acquire()
764
766 """lock for thread-safe publishing to this transport"""
767 if self.publock is not None:
768 self.publock.release()
769
771 """
772 Add a connection to this topic. This must be a PubTransport. If
773 the latch is enabled, c will be sent a the value of the
774 latch.
775 @param c: connection instance
776 @type c: L{Transport}
777 @return: True if connection was added
778 @rtype: bool
779 """
780 super(_PublisherImpl, self).add_connection(c)
781 def publish_single(data):
782 self.publish(data, connection_override=c)
783 for l in self.subscriber_listeners:
784 l.peer_subscribe(self.resolved_name, self.publish, publish_single)
785 if self.is_latch and self.latch is not None:
786 with self.publock:
787 self.publish(self.latch, connection_override=c)
788 return True
789
791 """
792 Remove existing connection from this topic.
793 @param c: connection instance to remove
794 @type c: L{Transport}
795 """
796 super(_PublisherImpl, self).remove_connection(c)
797 num = len(self.connections)
798 for l in self.subscriber_listeners:
799 l.peer_unsubscribe(self.resolved_name, num)
800
801 - def publish(self, message, connection_override=None):
802 """
803 Publish the data to the topic. If the topic has no subscribers,
804 the method will return without any affect. Access to publish()
805 should be locked using acquire() and release() in order to
806 ensure proper message publish ordering.
807
808 @param message: message data instance to publish
809 @type message: L{Message}
810 @param connection_override: publish to this connection instead of all
811 @type connection_override: L{Transport}
812 @return: True if the data was published, False otherwise.
813 @rtype: bool
814 @raise roslib.message.SerializationError: if L{Message} instance is unable to serialize itself
815 @raise rospy.ROSException: if topic has been closed or was closed during publish()
816 """
817
818
819 if self.closed:
820
821
822 if not is_shutdown():
823 raise ROSException("publish() to a closed topic")
824 else:
825 return
826
827 if self.is_latch:
828 self.latch = message
829
830 if not self.has_connections():
831
832 return False
833
834 if connection_override is None:
835
836 conns = self.connections
837 else:
838 conns = [connection_override]
839
840
841
842 b = self.buff
843 try:
844 b.tell()
845
846
847 self.seq += 1
848 serialize_message(b, self.seq, message)
849
850
851 err_con = []
852 data = b.getvalue()
853
854 for c in conns:
855 try:
856 if not is_shutdown():
857 c.write_data(data)
858 except TransportTerminated, e:
859 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc()))
860 err_con.append(c)
861 except Exception, e:
862
863 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc()))
864 err_con.append(c)
865
866
867 self.message_data_sent += b.tell()
868 b.seek(0)
869 b.truncate(0)
870
871 except ValueError:
872
873
874
875 if self.closed:
876 if is_shutdown():
877
878
879 return
880 else:
881
882
883 raise ROSException("topic was closed during publish()")
884 else:
885
886 raise
887
888
889 for c in err_con:
890 try:
891
892
893 c.close()
894 except:
895 pass
896
897
898
899
901 """
902 Tracks Topic objects
903 See L{get_topic_manager()} for singleton access
904 """
905
907 """ctor."""
908 super(_TopicManager, self).__init__()
909 self.pubs = {}
910 self.subs = {}
911 self.topics = set()
912 self.lock = threading.Condition()
913 self.closed = False
914 _logger.info("topicmanager initialized")
915
917 """
918 get topic publisher and subscriber connection info for getBusInfo() api
919 @return: [bus info stats]
920 See getBusInfo() API for more data structure details.
921 @rtype: list
922 """
923 with self.lock:
924 info = []
925 for s in chain(self.pubs.itervalues(), self.subs.itervalues()):
926 info.extend(s.get_stats_info())
927 return info
928
930 """
931 get topic publisher and subscriber stats for getBusStats() api
932 @return: [publisherStats, subscriberStats].
933 See getBusStats() API for more data structure details.
934 @rtype: list
935 """
936 with self.lock:
937 return [s.get_stats() for s in self.pubs.itervalues()],\
938 [s.get_stats() for s in self.subs.itervalues()]
939
941 """
942 Close all registered publication and subscriptions. Manager is
943 no longer usable after close.
944 """
945 self.closed = True
946 with self.lock:
947 for t in chain(self.pubs.itervalues(), self.subs.itervalues()):
948 t.close()
949 self.pubs.clear()
950 self.subs.clear()
951
952 - def _add(self, ps, rmap, reg_type):
953 """
954 Add L{_TopicImpl} instance to rmap
955 @param ps: a pub/sub impl instance
956 @type ps: L{_TopicImpl}
957 @param rmap: { topic: _TopicImpl} rmap to record instance in
958 @type rmap: dict
959 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
960 @type reg_type: str
961 """
962 resolved_name = ps.resolved_name
963 _logger.debug("tm._add: %s, %s, %s", resolved_name, ps.type, reg_type)
964 with self.lock:
965 rmap[resolved_name] = ps
966 self.topics.add(resolved_name)
967
968
969
970 get_registration_listeners().notify_added(resolved_name, ps.type, reg_type)
971
973 """recalculate self.topics. expensive"""
974 self.topics = set([x.resolved_name for x in self.pubs.itervalues()] +
975 [x.resolved_name for x in self.subs.itervalues()])
976
977 - def _remove(self, ps, rmap, reg_type):
978 """
979 Remove L{_TopicImpl} instance from rmap
980 @param ps: a pub/sub impl instance
981 @type ps: L{_TopicImpl}
982 @param rmap: topic->_TopicImpl rmap to remove instance in
983 @type rmap: dict
984 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
985 @type reg_type: str
986 """
987 resolved_name = ps.resolved_name
988 _logger.debug("tm._remove: %s, %s, %s", resolved_name, ps.type, reg_type)
989 with self.lock:
990 del rmap[resolved_name]
991 self. _recalculate_topics()
992
993
994
995 get_registration_listeners().notify_removed(resolved_name, ps.type, reg_type)
996
997 - def get_impl(self, reg_type, resolved_name):
998 """
999 Get the L{_TopicImpl} for the specified topic. This is mainly for
1000 testing purposes. Unlike acquire_impl, it does not alter the
1001 ref count.
1002 @param resolved_name: resolved topic name
1003 @type resolved_name: str
1004 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1005 @type reg_type: str
1006 """
1007 if reg_type == Registration.PUB:
1008 rmap = self.pubs
1009 elif reg_type == Registration.SUB:
1010 rmap = self.subs
1011 else:
1012 raise TypeError("invalid reg_type: %s"%s)
1013 return rmap.get(resolved_name, None)
1014
1015 - def acquire_impl(self, reg_type, resolved_name, data_class):
1016 """
1017 Acquire a L{_TopicImpl} for the specified topic (create one if it
1018 doesn't exist). Every L{Topic} instance has a _TopicImpl that
1019 actually controls the topic resources so that multiple Topic
1020 instances use the same underlying connections. 'Acquiring' a
1021 topic implementation marks that another Topic instance is
1022 using the TopicImpl.
1023
1024 @param resolved_name: resolved topic name
1025 @type resolved_name: str
1026
1027 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1028 @type reg_type: str
1029
1030 @param data_class: message class for topic
1031 @type data_class: L{Message} class
1032 """
1033 if reg_type == Registration.PUB:
1034 rmap = self.pubs
1035 impl_class = _PublisherImpl
1036 elif reg_type == Registration.SUB:
1037 rmap = self.subs
1038 impl_class = _SubscriberImpl
1039 else:
1040 raise TypeError("invalid reg_type: %s"%s)
1041 with self.lock:
1042 impl = rmap.get(resolved_name, None)
1043 if not impl:
1044 impl = impl_class(resolved_name, data_class)
1045 self._add(impl, rmap, reg_type)
1046 impl.ref_count += 1
1047 return impl
1048
1050 """
1051 Release a L_{TopicImpl} for the specified topic.
1052
1053 Every L{Topic} instance has a _TopicImpl that actually
1054 controls the topic resources so that multiple Topic instances
1055 use the same underlying connections. 'Acquiring' a topic
1056 implementation marks that another Topic instance is using the
1057 TopicImpl.
1058
1059 @param resolved_name: resolved topic name
1060 @type resolved_name: str
1061 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1062 @type reg_type: str
1063 """
1064 if reg_type == Registration.PUB:
1065 rmap = self.pubs
1066 else:
1067 rmap = self.subs
1068 with self.lock:
1069
1070 if self.closed:
1071 return
1072 impl = rmap.get(resolved_name, None)
1073 assert impl is not None, "cannot release topic impl as impl [%s] does not exist"%resolved_name
1074 impl.ref_count -= 1
1075 assert impl.ref_count >= 0, "topic impl's reference count has gone below zero"
1076 if impl.ref_count == 0:
1077 _logger.debug("topic impl's ref count is zero, deleting topic %s...", resolved_name)
1078 impl.close()
1079 self._remove(impl, rmap, reg_type)
1080 del impl
1081 _logger.debug("... done deleting topic %s", resolved_name)
1082
1084 """
1085 @param resolved_name: resolved topic name
1086 @type resolved_name: str
1087 @return: list of L{_PublisherImpl}s
1088 @rtype: [L{_PublisherImpl}]
1089 """
1090 return self.pubs.get(resolved_name, None)
1091
1093 """
1094 @param resolved_name: topic name
1095 @type resolved_name: str
1096 @return: subscriber for the specified topic.
1097 @rtype: L{_SubscriberImpl}
1098 """
1099 return self.subs.get(resolved_name, None)
1100
1102 """
1103 @param resolved_name: resolved topic name
1104 @type resolved_name: str
1105 @return: True if manager has subscription for specified topic
1106 @rtype: bool
1107 """
1108 return resolved_name in self.subs
1109
1111 """
1112 @param resolved_name: resolved topic name
1113 @type resolved_name: str
1114 @return: True if manager has publication for specified topic
1115 @rtype: bool
1116 """
1117 return resolved_name in self.pubs
1118
1120 """
1121 @return: list of topic names this node subscribes to/publishes
1122 @rtype: [str]
1123 """
1124 return self.topics
1125
1127 return [[k, v.type] for k, v in rmap.iteritems()]
1128
1129
1131 return self._get_list(self.subs)
1132
1133
1135 return self._get_list(self.pubs)
1136
1137 set_topic_manager(_TopicManager())
1138