1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 rospy implementation of topics.
37
38 Client API
39 ==========
40
41 L{Publisher} and L{Subscriber} are the client API for topics.
42
43 Internal Implementation
44 =======================
45
46 Internally, L{_TopicImpl} instances (_PublisherImpl/_SubscriberImpl)
47 are used to manage actual transport connections. The L{_TopicManager}
48 is responsible for tracking the system-wide state of publications and
49 subscriptions as well as the L{_TopicImpl} instances. More info is below.
50
51 L{_TopicManager}
52 ================
53
54 The L{_TopicManager} does the backend topic bookkeeping for the local
55 node. Use L{get_topic_manager()} to access singleton. Actual topic
56 implementations are done through the
57 L{_TopicImpl}/L{_PublisherImpl}/L{_SubscriberImpl} hierarchy. Client
58 code generates instances of type L{Publisher}/L{Subscriber}, which
59 enable to client to create multiple publishers/subscribers of that
60 topic that get controlled access to the underlying share connections.
61
62 Common parent classes for all rospy topics. The rospy topic autogenerators
63 create classes that are children of these implementations.
64 """
65
66
67 import struct
68 import select
69 try:
70 from cStringIO import StringIO
71 python3 = 0
73 return isinstance(s, basestring)
74 except ImportError:
75 python3 = 1
76 from io import StringIO, BytesIO
78 return isinstance(s, str)
79
80 import threading
81 import logging
82 import time
83
84 from itertools import chain
85 import traceback
86
87 import rosgraph.names
88
89 from rospy.core import *
90 from rospy.exceptions import ROSSerializationException, TransportTerminated
91 from rospy.msg import serialize_message, args_kwds_to_message
92
93 from rospy.impl.statistics import SubscriberStatisticsLogger
94
95 from rospy.impl.registration import get_topic_manager, set_topic_manager, Registration, get_registration_listeners
96 from rospy.impl.tcpros import get_tcpros_handler, DEFAULT_BUFF_SIZE
97 from rospy.impl.tcpros_pubsub import QueuedConnection
98
99 _logger = logging.getLogger('rospy.topics')
100
101
102 import genpy
103 Message = genpy.Message
104
105
106
107
108
109
110
111
113 """Base class of L{Publisher} and L{Subscriber}"""
114
115 - def __init__(self, name, data_class, reg_type):
116 """
117 @param name: graph resource name of topic, e.g. 'laser'.
118 @type name: str
119 @param data_class: message class for serialization
120 @type data_class: L{Message}
121 @param reg_type Registration.PUB or Registration.SUB
122 @type reg_type: str
123 @raise ValueError: if parameters are invalid
124 """
125
126 if not name or not isstring(name):
127 raise ValueError("topic name is not a non-empty string")
128 try:
129 if python3 == 1:
130 name.encode("utf-8")
131 else:
132 name = name.encode("utf-8")
133 except UnicodeError:
134 raise ValueError("topic name must be ascii/utf-8 compatible")
135 if data_class is None:
136 raise ValueError("topic parameter 'data_class' is not initialized")
137 if not type(data_class) == type:
138 raise ValueError("data_class [%s] is not a class"%data_class)
139 if not issubclass(data_class, genpy.Message):
140 raise ValueError("data_class [%s] is not a message data class"%data_class.__class__.__name__)
141
142 if not rosgraph.names.is_legal_name(name):
143 import warnings
144 warnings.warn("'%s' is not a legal ROS graph resource name. This may cause problems with other ROS tools"%name, stacklevel=2)
145
146
147
148 if not rospy.core.is_initialized():
149 self.resolved_name = rospy.names.resolve_name_without_node_name(name)
150 else:
151
152 self.resolved_name = resolve_name(name)
153
154 self.name = self.resolved_name
155
156 self.data_class = data_class
157 self.type = data_class._type
158 self.md5sum = data_class._md5sum
159 self.reg_type = reg_type
160 self.impl = get_topic_manager().acquire_impl(reg_type, self.resolved_name, data_class)
161
163 """
164 get the number of connections to other ROS nodes for this topic. For a Publisher,
165 this corresponds to the number of nodes subscribing. For a Subscriber, the number
166 of publishers.
167 @return: number of connections
168 @rtype: int
169 """
170 return self.impl.get_num_connections()
171
173 """
174 unpublish/unsubscribe from topic. Topic instance is no longer
175 valid after this call. Additional calls to unregister() have no effect.
176 """
177
178
179 resolved_name = self.resolved_name
180 if resolved_name and self.impl:
181 get_topic_manager().release_impl(self.reg_type, resolved_name)
182 self.impl = self.resolved_name = self.type = self.md5sum = self.data_class = None
183
184
186 """
187 select.poll/kqueue abstraction to handle socket failure detection
188 on multiple platforms. NOT thread-safe.
189 """
191 try:
192 self.poller = select.poll()
193 self.add_fd = self.add_poll
194 self.remove_fd = self.remove_poll
195 self.error_iter = self.error_poll_iter
196 except:
197 try:
198
199 self.poller = select.kqueue()
200 self.add_fd = self.add_kqueue
201 self.remove_fd = self.remove_kqueue
202 self.error_iter = self.error_kqueue_iter
203 self.kevents = []
204 except:
205
206 self.poller = self.noop
207 self.add_fd = self.noop
208 self.remove_fd = self.noop
209 self.error_iter = self.noop_iter
210
211 - def noop(self, *args):
213
215 empty_generator = (x for x in list())
216 for x in empty_generator:
217 yield x
218
221
224
226 events = self.poller.poll(0)
227 for fd, event in events:
228 if event & (select.POLLHUP | select.POLLERR):
229 yield fd
230
232 self.kevents.append(select.kevent(fd))
233
235 events = self.poller.control(self.kevents, 0)
236 for event in events:
237 if event & (select.KQ_EV_ERROR | select.KQ_EV_EOF):
238 yield event.ident
239
241 e = [x for x in self.kevents if x.ident == fd]
242 for x in e:
243 self.kevents.remove(x)
244
246 """
247 Base class of internal topic implementations. Each topic has a
248 singleton _TopicImpl implementation for managing the underlying
249 connections.
250 """
251
253 """
254 Base constructor
255 @param name: graph resource name of topic, e.g. 'laser'.
256 @type name: str
257 @param data_class: message data class
258 @type data_class: L{Message}
259 """
260
261
262 self.resolved_name = resolve_name(name)
263 self.name = self.resolved_name
264
265 self.data_class = data_class
266 self.type = data_class._type
267 self.handler = None
268 self.seq = 0
269
270
271
272
273
274
275
276 self.c_lock = threading.RLock()
277 self.connections = []
278 self.closed = False
279
280 self.ref_count = 0
281
282 self.connection_poll = Poller()
283
285
286 if self.closed:
287 return
288 if self.connections is not None:
289 for c in self.connections:
290 try:
291 c.close()
292 except:
293 pass
294 del self.connections[:]
295 self.c_lock = self.connections = self.handler = self.data_class = self.type = None
296
298 """close I/O"""
299 if self.closed:
300 return
301 self.closed = True
302 if self.c_lock is not None:
303 with self.c_lock:
304 for c in self.connections:
305 try:
306 if c is not None:
307 c.close()
308 except:
309
310 _logger.error(traceback.format_exc())
311 del self.connections[:]
312 self.handler = None
313
315 with self.c_lock:
316 return len(self.connections)
317
319 """
320 Query whether or not a connection with the associated \a
321 endpoint has been added to this object.
322 @param endpoint_id: endpoint ID associated with connection.
323 @type endpoint_id: str
324 """
325
326 conn = self.connections
327 for c in conn:
328 if c.endpoint_id == endpoint_id:
329 return True
330 return False
331
333 """
334 Check to see if this topic is connected to other publishers/subscribers
335 @return: True if topic is connected
336 @rtype: bool
337 """
338 if self.connections:
339 return True
340 return False
341
343
344 try:
345 self.connection_poll.remove_fd(c.fileno())
346 except:
347 pass
348 try:
349
350
351 c.close()
352 except:
353 pass
354
355
356
357 if c in connections:
358 connections.remove(c)
359
360 elif c.fileno():
361 matching_connections = [
362 conn for conn in connections if conn.fileno() == c.fileno()]
363 if len(matching_connections) == 1:
364 connections.remove(matching_connections[0])
365
367 """
368 Add a connection to this topic. If any previous connections
369 to same endpoint exist, drop them.
370
371 @param c: connection instance
372 @type c: Transport
373 @return: True if connection was added, ``bool``
374 """
375 rospyinfo("topic[%s] adding connection to [%s], count %s"%(self.resolved_name, c.endpoint_id, len(self.connections)))
376 with self.c_lock:
377
378 if self.closed:
379 rospyerr(
380 "ERROR: Race condition failure adding connection to closed subscriber\n"
381 "If you run into this please comment on "
382 "https://github.com/ros/ros_comm/issues/544"
383 )
384 return False
385
386
387
388
389 new_connections = self.connections[:]
390
391
392
393 for oldc in self.connections:
394 if oldc.endpoint_id == c.endpoint_id:
395 self._remove_connection(new_connections, oldc)
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411 for fd in self.connection_poll.error_iter():
412 to_remove = [x for x in new_connections if x.fileno() == fd]
413 for x in to_remove:
414 rospydebug("removing connection to %s, connection error detected"%(x.endpoint_id))
415 self._remove_connection(new_connections, x)
416
417
418
419 new_fd = c.fileno()
420 if new_fd is not None:
421 self.connection_poll.add_fd(new_fd)
422
423
424 new_connections.append(c)
425
426 self.connections = new_connections
427
428
429
430 if not c.cleanup_cb:
431 c.set_cleanup_callback(self.remove_connection)
432 else:
433 previous_callback = c.cleanup_cb
434 new_callback = self.remove_connection
435 def cleanup_cb_wrapper(s):
436 new_callback(s)
437 previous_callback(s)
438 c.set_cleanup_callback(cleanup_cb_wrapper)
439
440 return True
441
443 """
444 Remove connection from topic.
445 @param c: connection instance to remove
446 @type c: Transport
447 """
448 rospyinfo("topic[%s] removing connection to %s"%(self.resolved_name, c.endpoint_id))
449 with self.c_lock:
450
451
452
453 new_connections = self.connections[:]
454 self._remove_connection(new_connections, c)
455 self.connections = new_connections
456
458 """
459 Get the stats for this topic
460 @return: stats for topic in getBusInfo() format::
461 Publisher:
462 ((connection_id, destination_caller_id, direction, transport, topic_name, connected, connection_info_string)*)
463 Subscriber:
464 ((connection_id, publisher_xmlrpc_uri, direction, transport, topic_name, connected, connection_info_string)*)
465 @rtype: list
466 """
467
468 connections = self.connections
469 return [(c.id, c.endpoint_id, c.direction, c.transport_type, self.resolved_name, True, c.get_transport_info()) for c in connections]
470
472 """Get the stats for this topic (API stub)"""
473 raise Exception("subclasses must override")
474
475
476
477
478
480 """
481 Class for registering as a subscriber to a specified topic, where
482 the messages are of a given type.
483 """
484 - def __init__(self, name, data_class, callback=None, callback_args=None,
485 queue_size=None, buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False):
486 """
487 Constructor.
488
489 NOTE: for the queue_size and buff_size
490 parameters, rospy does not attempt to do intelligent merging
491 between multiple Subscriber instances for the same topic. As
492 they share the same underlying transport, multiple Subscribers
493 to the same topic can conflict with one another if they set
494 these parameters differently.
495
496 @param name: graph resource name of topic, e.g. 'laser'.
497 @type name: str
498 @param data_class: data type class to use for messages,
499 e.g. std_msgs.msg.String
500 @type data_class: L{Message} class
501 @param callback: function to call ( fn(data)) when data is
502 received. If callback_args is set, the function must accept
503 the callback_args as a second argument, i.e. fn(data,
504 callback_args). NOTE: Additional callbacks can be added using
505 add_callback().
506 @type callback: fn(msg, cb_args)
507 @param callback_args: additional arguments to pass to the
508 callback. This is useful when you wish to reuse the same
509 callback for multiple subscriptions.
510 @type callback_args: any
511 @param queue_size: maximum number of messages to receive at
512 a time. This will generally be 1 or None (infinite,
513 default). buff_size should be increased if this parameter
514 is set as incoming data still needs to sit in the incoming
515 buffer before being discarded. Setting queue_size
516 buff_size to a non-default value affects all subscribers to
517 this topic in this process.
518 @type queue_size: int
519 @param buff_size: incoming message buffer size in bytes. If
520 queue_size is set, this should be set to a number greater
521 than the queue_size times the average message size. Setting
522 buff_size to a non-default value affects all subscribers to
523 this topic in this process.
524 @type buff_size: int
525 @param tcp_nodelay: if True, request TCP_NODELAY from
526 publisher. Use of this option is not generally recommended
527 in most cases as it is better to rely on timestamps in
528 message data. Setting tcp_nodelay to True enables TCP_NODELAY
529 for all subscribers in the same python process.
530 @type tcp_nodelay: bool
531 @raise ROSException: if parameters are invalid
532 """
533 super(Subscriber, self).__init__(name, data_class, Registration.SUB)
534
535
536
537
538 if queue_size is not None:
539 self.impl.set_queue_size(queue_size)
540 if buff_size != DEFAULT_BUFF_SIZE:
541 self.impl.set_buff_size(buff_size)
542
543 if callback is not None:
544
545
546
547 self.impl.add_callback(callback, callback_args)
548
549 self.callback = callback
550 self.callback_args = callback_args
551 else:
552
553 self.callback = self.callback_args = None
554 if tcp_nodelay:
555 self.impl.set_tcp_nodelay(tcp_nodelay)
556
558 """
559 unpublish/unsubscribe from topic. Topic instance is no longer
560 valid after this call. Additional calls to unregister() have no effect.
561 """
562 if self.impl:
563
564
565 if self.callback is not None:
566 self.impl.remove_callback(self.callback, self.callback_args)
567 self.callback = self.callback_args = None
568 super(Subscriber, self).unregister()
569
571 """
572 Underyling L{_TopicImpl} implementation for subscriptions.
573 """
575 """
576 ctor.
577 @param name: graph resource name of topic, e.g. 'laser'.
578 @type name: str
579 @param data_class: Message data class
580 @type data_class: L{Message} class
581 """
582 super(_SubscriberImpl, self).__init__(name, data_class)
583
584
585
586 self.callbacks = []
587 self.queue_size = None
588 self.buff_size = DEFAULT_BUFF_SIZE
589 self.tcp_nodelay = False
590 self.statistics_logger = SubscriberStatisticsLogger(self) \
591 if SubscriberStatisticsLogger.is_enabled() \
592 else None
593
595 """close I/O and release resources"""
596 _TopicImpl.close(self)
597 if self.callbacks:
598 del self.callbacks[:]
599 self.callbacks = None
600 if self.statistics_logger:
601 self.statistics_logger.shutdown()
602 self.statistics_logger = None
603
605 """
606 Set the value of TCP_NODELAY, which causes the Nagle algorithm
607 to be disabled for future topic connections, if the publisher
608 supports it.
609 """
610 self.tcp_nodelay = tcp_nodelay
611
613 """
614 Set the receive queue size. If more than queue_size messages
615 are waiting to be deserialized, they are discarded.
616
617 @param queue_size int: incoming queue size. Must be positive integer or None.
618 @type queue_size: int
619 """
620 if queue_size == -1:
621 self.queue_size = None
622 elif queue_size == 0:
623 raise ROSException("queue size may not be set to zero")
624 elif queue_size is not None and type(queue_size) != int:
625 raise ROSException("queue size must be an integer")
626 else:
627 self.queue_size = queue_size
628
630 """
631 Set the receive buffer size. The exact meaning of this is
632 transport dependent.
633 @param buff_size: receive buffer size
634 @type buff_size: int
635 """
636 if type(buff_size) != int:
637 raise ROSException("buffer size must be an integer")
638 elif buff_size <= 0:
639 raise ROSException("buffer size must be a positive integer")
640 self.buff_size = buff_size
641
643 """
644 Get the stats for this topic subscriber
645 @return: stats for topic in getBusStats() publisher format::
646 (topicName, connStats)
647 where connStats is::
648 [connectionId, bytesReceived, numSent, dropEstimate, connected]*
649 @rtype: list
650 """
651
652 conn = self.connections
653
654 stats = (self.resolved_name,
655 [(c.id, c.stat_bytes, c.stat_num_msg, -1, not c.done)
656 for c in conn] )
657 return stats
658
660 """
661 Register a callback to be invoked whenever a new message is received
662 @param cb: callback function to invoke with message data
663 instance, i.e. fn(data). If callback args is set, they will
664 be passed in as the second argument.
665 @type cb: fn(msg, cb_args)
666 @param cb_cargs: additional arguments to pass to callback
667 @type cb_cargs: Any
668 """
669 if self.closed:
670 raise ROSException("subscriber [%s] has been closed"%(self.resolved_name))
671 with self.c_lock:
672
673
674 new_callbacks = self.callbacks[:]
675 new_callbacks.append((cb, cb_args))
676 self.callbacks = new_callbacks
677
678
679 for c in self.connections:
680 if c.latch is not None:
681 self._invoke_callback(c.latch, cb, cb_args)
682
684 """
685 Unregister a message callback.
686 @param cb: callback function
687 @type cb: fn(msg, cb_args)
688 @param cb_cargs: additional arguments associated with callback
689 @type cb_cargs: Any
690 @raise KeyError: if no matching callback
691 """
692 if self.closed:
693 return
694 with self.c_lock:
695
696
697 matches = [x for x in self.callbacks if x[0] == cb and x[1] == cb_args]
698 if matches:
699 new_callbacks = self.callbacks[:]
700
701 new_callbacks.remove(matches[0])
702 self.callbacks = new_callbacks
703 if not matches:
704 raise KeyError("no matching cb")
705
707 """
708 Invoke callback on msg. Traps and logs any exceptions raise by callback
709 @param msg: message data
710 @type msg: L{Message}
711 @param cb: callback
712 @type cb: fn(msg, cb_args)
713 @param cb_args: callback args or None
714 @type cb_args: Any
715 """
716 try:
717 if cb_args is not None:
718 cb(msg, cb_args)
719 else:
720 cb(msg)
721 except Exception as e:
722 if not is_shutdown():
723 logerr("bad callback: %s\n%s"%(cb, traceback.format_exc()))
724 else:
725 _logger.warn("during shutdown, bad callback: %s\n%s"%(cb, traceback.format_exc()))
726
728 """
729 Called by underlying connection transport for each new message received
730 @param msgs: message data
731 @type msgs: [L{Message}]
732 """
733
734 callbacks = self.callbacks
735 for msg in msgs:
736 if self.statistics_logger:
737 self.statistics_logger.callback(msg, connection.callerid_pub, connection.stat_bytes)
738 for cb, cb_args in callbacks:
739 self._invoke_callback(msg, cb, cb_args)
740
742 """
743 Callback API to receive notifications when new subscribers
744 connect and disconnect.
745 """
746
748 """
749 callback when a peer has subscribed from a topic
750 @param topic_name: topic name. NOTE: topic name will be resolved/remapped
751 @type topic_name: str
752 @param topic_publish: method to publish message data to all subscribers
753 @type topic_publish: fn(data)
754 @param peer_publish: method to publish message data to
755 new subscriber. NOTE: behavior for the latter is
756 transport-dependent as some transports may be broadcast only.
757 @type peer_publish: fn(data)
758 """
759 pass
760
762 """
763 callback when a peer has unsubscribed from a topic
764 @param topic_name: topic name. NOTE: topic name will be resolved/remapped
765 @type topic_name: str
766 @param num_peers: number of remaining peers subscribed to topic
767 @type num_peers: int
768 """
769 pass
770
771
772
773
774
775
776
778 """
779 Class for registering as a publisher of a ROS topic.
780 """
781
782 - def __init__(self, name, data_class, subscriber_listener=None, tcp_nodelay=False, latch=False, headers=None, queue_size=None):
783 """
784 Constructor
785 @param name: resource name of topic, e.g. 'laser'.
786 @type name: str
787 @param data_class: message class for serialization
788 @type data_class: L{Message} class
789 @param subscriber_listener: listener for
790 subscription events. May be None.
791 @type subscriber_listener: L{SubscribeListener}
792 @param tcp_nodelay: If True, sets TCP_NODELAY on
793 publisher's socket (disables Nagle algorithm). This results
794 in lower latency publishing at the cost of efficiency.
795 @type tcp_nodelay: bool
796 @param latch: If True, the last message published is
797 'latched', meaning that any future subscribers will be sent
798 that message immediately upon connection.
799 @type latch: bool
800 @param headers: If not None, a dictionary with additional header
801 key-values being used for future connections.
802 @type headers: dict
803 @param queue_size: The queue size used for asynchronously
804 publishing messages from different threads. A size of zero
805 means an infinite queue, which can be dangerous. When the
806 keyword is not being used or when None is passed all
807 publishing will happen synchronously and a warning message
808 will be printed.
809 @type queue_size: int
810 @raise ROSException: if parameters are invalid
811 """
812 super(Publisher, self).__init__(name, data_class, Registration.PUB)
813
814 if subscriber_listener:
815 self.impl.add_subscriber_listener(subscriber_listener)
816 if tcp_nodelay:
817 get_tcpros_handler().set_tcp_nodelay(self.resolved_name, tcp_nodelay)
818 if latch:
819 self.impl.enable_latch()
820 if headers:
821 self.impl.add_headers(headers)
822 if queue_size is not None:
823 self.impl.set_queue_size(queue_size)
824 else:
825 import warnings
826 warnings.warn("The publisher should be created with an explicit keyword argument 'queue_size'. "
827 "Please see http://wiki.ros.org/rospy/Overview/Publishers%20and%20Subscribers for more information.", SyntaxWarning, stacklevel=2)
828
830 """
831 Publish message data object to this topic.
832 Publish can either be called with the message instance to
833 publish or with the constructor args for a new Message
834 instance, i.e.::
835 pub.publish(message_instance)
836 pub.publish(message_field_1, message_field_2...)
837 pub.publish(message_field_1='foo', message_field_2='bar')
838
839 @param args : L{Message} instance, message arguments, or no args if keyword arguments are used
840 @param kwds : Message keyword arguments. If kwds are used, args must be unset
841 @raise ROSException: If rospy node has not been initialized
842 @raise ROSSerializationException: If unable to serialize
843 message. This is usually a type error with one of the fields.
844 """
845 if self.impl is None:
846 raise ROSException("publish() to an unregistered() handle")
847 if not is_initialized():
848 raise ROSException("ROS node has not been initialized yet. Please call init_node() first")
849 data = args_kwds_to_message(self.data_class, args, kwds)
850 try:
851 self.impl.acquire()
852 self.impl.publish(data)
853 except genpy.SerializationError as e:
854
855 _logger.error(traceback.format_exc())
856 raise ROSSerializationException(str(e))
857 finally:
858 self.impl.release()
859
861 """
862 Underyling L{_TopicImpl} implementation for publishers.
863 """
864
866 """
867 @param name: name of topic, e.g. 'laser'.
868 @type name: str
869 @param data_class: Message data class
870 @type data_class: L{Message} class
871 """
872 super(_PublisherImpl, self).__init__(name, data_class)
873 if python3 == 0:
874 self.buff = StringIO()
875 else:
876 self.buff = BytesIO()
877 self.publock = threading.RLock()
878 self.subscriber_listeners = []
879
880
881 self.headers = {}
882
883
884 self.is_latch = False
885 self.latch = None
886
887
888 self.queue_size = None
889
890
891 self.message_data_sent = 0
892
894 """close I/O and release resources"""
895 _TopicImpl.close(self)
896
897 if self.subscriber_listeners:
898 del self.subscriber_listeners[:]
899 if self.headers:
900 self.headers.clear()
901 if self.buff is not None:
902 self.buff.close()
903 self.publock = self.headers = self.buff = self.subscriber_listeners = None
904
906 """
907 Add connection headers to this Topic for future connections.
908 @param headers: key/values will be added to current connection
909 header set, overriding any existing keys if they conflict.
910 @type headers: dict
911 """
912 self.headers.update(headers)
913
915 """
916 Enable publish() latch. The latch contains the last published
917 message and is sent to any new subscribers.
918 """
919 self.is_latch = True
920
922 self.queue_size = queue_size
923
925 """
926 Get the stats for this topic publisher
927 @return: stats for topic in getBusStats() publisher format::
928 [topicName, messageDataBytes, connStats],
929 where connStats is::
930 [id, bytes, numMessages, connected]*
931 @rtype: list
932 """
933
934 conn = self.connections
935 return (self.resolved_name, self.message_data_sent,
936 [(c.id, c.stat_bytes, c.stat_num_msg, not c.done) for c in conn] )
937
939 """
940 Add a L{SubscribeListener} for subscribe events.
941 @param l: listener instance
942 @type l: L{SubscribeListener}
943 """
944 self.subscriber_listeners.append(l)
945
947 """lock for thread-safe publishing to this transport"""
948 if self.publock is not None:
949 self.publock.acquire()
950
952 """lock for thread-safe publishing to this transport"""
953 if self.publock is not None:
954 self.publock.release()
955
957 """
958 Add a connection to this topic. This must be a PubTransport. If
959 the latch is enabled, c will be sent a the value of the
960 latch.
961 @param c: connection instance
962 @type c: L{Transport}
963 @return: True if connection was added
964 @rtype: bool
965 """
966 if self.queue_size is not None:
967 c = QueuedConnection(c, self.queue_size)
968 super(_PublisherImpl, self).add_connection(c)
969 def publish_single(data):
970 self.publish(data, connection_override=c)
971 for l in self.subscriber_listeners:
972 l.peer_subscribe(self.resolved_name, self.publish, publish_single)
973 if self.is_latch and self.latch is not None:
974 with self.publock:
975 self.publish(self.latch, connection_override=c)
976 return True
977
979 """
980 Remove existing connection from this topic.
981 @param c: connection instance to remove
982 @type c: L{Transport}
983 """
984 super(_PublisherImpl, self).remove_connection(c)
985 num = len(self.connections)
986 for l in self.subscriber_listeners:
987 l.peer_unsubscribe(self.resolved_name, num)
988
989 - def publish(self, message, connection_override=None):
990 """
991 Publish the data to the topic. If the topic has no subscribers,
992 the method will return without any affect. Access to publish()
993 should be locked using acquire() and release() in order to
994 ensure proper message publish ordering.
995
996 @param message: message data instance to publish
997 @type message: L{Message}
998 @param connection_override: publish to this connection instead of all
999 @type connection_override: L{Transport}
1000 @return: True if the data was published, False otherwise.
1001 @rtype: bool
1002 @raise genpy.SerializationError: if L{Message} instance is unable to serialize itself
1003 @raise rospy.ROSException: if topic has been closed or was closed during publish()
1004 """
1005
1006
1007 if self.closed:
1008
1009
1010 if not is_shutdown():
1011 raise ROSException("publish() to a closed topic")
1012 else:
1013 return
1014
1015 if self.is_latch:
1016 self.latch = message
1017
1018 if not self.has_connections():
1019
1020 return False
1021
1022 if connection_override is None:
1023
1024 conns = self.connections
1025 else:
1026 conns = [connection_override]
1027
1028
1029
1030 b = self.buff
1031 try:
1032 b.tell()
1033
1034
1035 self.seq += 1
1036 serialize_message(b, self.seq, message)
1037
1038
1039 err_con = []
1040 data = b.getvalue()
1041
1042 for c in conns:
1043 try:
1044 if not is_shutdown():
1045 c.write_data(data)
1046 except TransportTerminated as e:
1047 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc()))
1048 err_con.append(c)
1049 except Exception as e:
1050
1051 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc()))
1052 err_con.append(c)
1053
1054
1055 self.message_data_sent += b.tell()
1056 b.seek(0)
1057 b.truncate(0)
1058
1059 except ValueError:
1060
1061
1062
1063 if self.closed:
1064 if is_shutdown():
1065
1066
1067 return
1068 else:
1069
1070
1071 raise ROSException("topic was closed during publish()")
1072 else:
1073
1074 raise
1075
1076
1077 for c in err_con:
1078 try:
1079
1080
1081 c.close()
1082 except:
1083 pass
1084
1085
1086
1087
1089 """
1090 Tracks Topic objects
1091 See L{get_topic_manager()} for singleton access
1092 """
1093
1095 """ctor."""
1096 super(_TopicManager, self).__init__()
1097 self.pubs = {}
1098 self.subs = {}
1099 self.topics = set()
1100 self.lock = threading.Condition()
1101 self.closed = False
1102 _logger.info("topicmanager initialized")
1103
1105 """
1106 get topic publisher and subscriber connection info for getBusInfo() api
1107 @return: [bus info stats]
1108 See getBusInfo() API for more data structure details.
1109 @rtype: list
1110 """
1111 with self.lock:
1112 info = []
1113 for s in chain(iter(self.pubs.values()), iter(self.subs.values())):
1114 info.extend(s.get_stats_info())
1115 return info
1116
1118 """
1119 get topic publisher and subscriber stats for getBusStats() api
1120 @return: [publisherStats, subscriberStats].
1121 See getBusStats() API for more data structure details.
1122 @rtype: list
1123 """
1124 with self.lock:
1125 return [s.get_stats() for s in self.pubs.values()],\
1126 [s.get_stats() for s in self.subs.values()]
1127
1129 """
1130 Close all registered publication and subscriptions. Manager is
1131 no longer usable after close.
1132 """
1133 self.closed = True
1134 with self.lock:
1135 for t in chain(iter(self.pubs.values()), iter(self.subs.values())):
1136 t.close()
1137 self.pubs.clear()
1138 self.subs.clear()
1139
1140 - def _add(self, ps, rmap, reg_type):
1141 """
1142 Add L{_TopicImpl} instance to rmap
1143 @param ps: a pub/sub impl instance
1144 @type ps: L{_TopicImpl}
1145 @param rmap: { topic: _TopicImpl} rmap to record instance in
1146 @type rmap: dict
1147 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1148 @type reg_type: str
1149 """
1150 resolved_name = ps.resolved_name
1151 _logger.debug("tm._add: %s, %s, %s", resolved_name, ps.type, reg_type)
1152 with self.lock:
1153 rmap[resolved_name] = ps
1154 self.topics.add(resolved_name)
1155
1156
1157
1158 get_registration_listeners().notify_added(resolved_name, ps.type, reg_type)
1159
1161 """recalculate self.topics. expensive"""
1162 self.topics = set([x.resolved_name for x in self.pubs.values()] +
1163 [x.resolved_name for x in self.subs.values()])
1164
1165 - def _remove(self, ps, rmap, reg_type):
1166 """
1167 Remove L{_TopicImpl} instance from rmap
1168 @param ps: a pub/sub impl instance
1169 @type ps: L{_TopicImpl}
1170 @param rmap: topic->_TopicImpl rmap to remove instance in
1171 @type rmap: dict
1172 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1173 @type reg_type: str
1174 """
1175 resolved_name = ps.resolved_name
1176 _logger.debug("tm._remove: %s, %s, %s", resolved_name, ps.type, reg_type)
1177 with self.lock:
1178 del rmap[resolved_name]
1179 self. _recalculate_topics()
1180
1181
1182
1183 get_registration_listeners().notify_removed(resolved_name, ps.type, reg_type)
1184
1185 - def get_impl(self, reg_type, resolved_name):
1186 """
1187 Get the L{_TopicImpl} for the specified topic. This is mainly for
1188 testing purposes. Unlike acquire_impl, it does not alter the
1189 ref count.
1190 @param resolved_name: resolved topic name
1191 @type resolved_name: str
1192 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1193 @type reg_type: str
1194 """
1195 if reg_type == Registration.PUB:
1196 rmap = self.pubs
1197 elif reg_type == Registration.SUB:
1198 rmap = self.subs
1199 else:
1200 raise TypeError("invalid reg_type: %s"%s)
1201 return rmap.get(resolved_name, None)
1202
1203 - def acquire_impl(self, reg_type, resolved_name, data_class):
1204 """
1205 Acquire a L{_TopicImpl} for the specified topic (create one if it
1206 doesn't exist). Every L{Topic} instance has a _TopicImpl that
1207 actually controls the topic resources so that multiple Topic
1208 instances use the same underlying connections. 'Acquiring' a
1209 topic implementation marks that another Topic instance is
1210 using the TopicImpl.
1211
1212 @param resolved_name: resolved topic name
1213 @type resolved_name: str
1214
1215 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1216 @type reg_type: str
1217
1218 @param data_class: message class for topic
1219 @type data_class: L{Message} class
1220 """
1221 if reg_type == Registration.PUB:
1222 rmap = self.pubs
1223 impl_class = _PublisherImpl
1224 elif reg_type == Registration.SUB:
1225 rmap = self.subs
1226 impl_class = _SubscriberImpl
1227 else:
1228 raise TypeError("invalid reg_type: %s"%s)
1229 with self.lock:
1230 impl = rmap.get(resolved_name, None)
1231 if not impl:
1232 impl = impl_class(resolved_name, data_class)
1233 self._add(impl, rmap, reg_type)
1234 impl.ref_count += 1
1235 return impl
1236
1238 """
1239 Release a L_{TopicImpl} for the specified topic.
1240
1241 Every L{Topic} instance has a _TopicImpl that actually
1242 controls the topic resources so that multiple Topic instances
1243 use the same underlying connections. 'Acquiring' a topic
1244 implementation marks that another Topic instance is using the
1245 TopicImpl.
1246
1247 @param resolved_name: resolved topic name
1248 @type resolved_name: str
1249 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1250 @type reg_type: str
1251 """
1252 if reg_type == Registration.PUB:
1253 rmap = self.pubs
1254 else:
1255 rmap = self.subs
1256 with self.lock:
1257
1258 if self.closed:
1259 return
1260 impl = rmap.get(resolved_name, None)
1261 assert impl is not None, "cannot release topic impl as impl [%s] does not exist"%resolved_name
1262 impl.ref_count -= 1
1263 assert impl.ref_count >= 0, "topic impl's reference count has gone below zero"
1264 if impl.ref_count == 0:
1265 rospyinfo("topic impl's ref count is zero, deleting topic %s...", resolved_name)
1266 impl.close()
1267 self._remove(impl, rmap, reg_type)
1268 del impl
1269 _logger.debug("... done deleting topic %s", resolved_name)
1270
1272 """
1273 @param resolved_name: resolved topic name
1274 @type resolved_name: str
1275 @return: list of L{_PublisherImpl}s
1276 @rtype: [L{_PublisherImpl}]
1277 """
1278 return self.pubs.get(resolved_name, None)
1279
1281 """
1282 @param resolved_name: topic name
1283 @type resolved_name: str
1284 @return: subscriber for the specified topic.
1285 @rtype: L{_SubscriberImpl}
1286 """
1287 return self.subs.get(resolved_name, None)
1288
1290 """
1291 @param resolved_name: resolved topic name
1292 @type resolved_name: str
1293 @return: True if manager has subscription for specified topic
1294 @rtype: bool
1295 """
1296 return resolved_name in self.subs
1297
1299 """
1300 @param resolved_name: resolved topic name
1301 @type resolved_name: str
1302 @return: True if manager has publication for specified topic
1303 @rtype: bool
1304 """
1305 return resolved_name in self.pubs
1306
1308 """
1309 @return: list of topic names this node subscribes to/publishes
1310 @rtype: [str]
1311 """
1312 return self.topics
1313
1315 return [[k, v.type] for k, v in rmap.items()]
1316
1317
1319 return self._get_list(self.subs)
1320
1321
1323 return self._get_list(self.pubs)
1324
1325 set_topic_manager(_TopicManager())
1326