1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 rospy implementation of topics.
37
38 Client API
39 ==========
40
41 L{Publisher} and L{Subscriber} are the client API for topics.
42
43 Internal Implementation
44 =======================
45
46 Internally, L{_TopicImpl} instances (_PublisherImpl/_SubscriberImpl)
47 are used to manage actual transport connections. The L{_TopicManager}
48 is responsible for tracking the system-wide state of publications and
49 subscriptions as well as the L{_TopicImpl} instances. More info is below.
50
51 L{_TopicManager}
52 ================
53
54 The L{_TopicManager} does the backend topic bookkeeping for the local
55 node. Use L{get_topic_manager()} to access singleton. Actual topic
56 implementations are done through the
57 L{_TopicImpl}/L{_PublisherImpl}/L{_SubscriberImpl} hierarchy. Client
58 code generates instances of type L{Publisher}/L{Subscriber}, which
59 enable to client to create multiple publishers/subscribers of that
60 topic that get controlled access to the underlying share connections.
61
62 Common parent classes for all rospy topics. The rospy topic autogenerators
63 create classes that are children of these implementations.
64 """
65
66
67 import struct
68 import select
69 try:
70 from cStringIO import StringIO
71 python3 = 0
73 return isinstance(s, basestring)
74 except ImportError:
75 python3 = 1
76 from io import StringIO, BytesIO
78 return isinstance(s, str)
79
80 import threading
81 import logging
82 import time
83
84 from itertools import chain
85 import traceback
86
87 import rosgraph.names
88
89 from rospy.core import *
90 from rospy.exceptions import ROSSerializationException, TransportTerminated
91 from rospy.msg import serialize_message, args_kwds_to_message
92
93 from rospy.impl.statistics import SubscriberStatisticsLogger
94
95 from rospy.impl.registration import get_topic_manager, set_topic_manager, Registration, get_registration_listeners
96 from rospy.impl.tcpros import get_tcpros_handler, DEFAULT_BUFF_SIZE
97 from rospy.impl.tcpros_pubsub import QueuedConnection
98
99 _logger = logging.getLogger('rospy.topics')
100
101
102 import genpy
103 Message = genpy.Message
104
105
106
107
108
109
110
111
112 if not hasattr(select, 'EPOLLRDHUP'):
113 select.EPOLLRDHUP = 0x2000
114
115
117 """Base class of L{Publisher} and L{Subscriber}"""
118
119 - def __init__(self, name, data_class, reg_type):
120 """
121 @param name: graph resource name of topic, e.g. 'laser'.
122 @type name: str
123 @param data_class: message class for serialization
124 @type data_class: L{Message}
125 @param reg_type Registration.PUB or Registration.SUB
126 @type reg_type: str
127 @raise ValueError: if parameters are invalid
128 """
129
130 if not name or not isstring(name):
131 raise ValueError("topic name is not a non-empty string")
132 try:
133 if python3 == 1:
134 name.encode("utf-8")
135 else:
136 name = name.encode("utf-8")
137 except UnicodeError:
138 raise ValueError("topic name must be ascii/utf-8 compatible")
139 if data_class is None:
140 raise ValueError("topic parameter 'data_class' is not initialized")
141 if not type(data_class) == type:
142 raise ValueError("data_class [%s] is not a class"%data_class)
143 if not issubclass(data_class, genpy.Message):
144 raise ValueError("data_class [%s] is not a message data class"%data_class.__class__.__name__)
145
146 if not rosgraph.names.is_legal_name(name):
147 import warnings
148 warnings.warn("'%s' is not a legal ROS graph resource name. This may cause problems with other ROS tools"%name, stacklevel=2)
149
150
151
152 if not rospy.core.is_initialized():
153 self.resolved_name = rospy.names.resolve_name_without_node_name(name)
154 else:
155
156 self.resolved_name = resolve_name(name)
157
158 self.name = self.resolved_name
159
160 self.data_class = data_class
161 self.type = data_class._type
162 self.md5sum = data_class._md5sum
163 self.reg_type = reg_type
164 self.impl = get_topic_manager().acquire_impl(reg_type, self.resolved_name, data_class)
165
167 """
168 get the number of connections to other ROS nodes for this topic. For a Publisher,
169 this corresponds to the number of nodes subscribing. For a Subscriber, the number
170 of publishers.
171 @return: number of connections
172 @rtype: int
173 """
174 return self.impl.get_num_connections()
175
177 """
178 unpublish/unsubscribe from topic. Topic instance is no longer
179 valid after this call. Additional calls to unregister() have no effect.
180 """
181
182
183 resolved_name = self.resolved_name
184 if resolved_name and self.impl:
185 get_topic_manager().release_impl(self.reg_type, resolved_name)
186 self.impl = self.resolved_name = self.type = self.md5sum = self.data_class = None
187
188
190 """
191 select.poll/kqueue abstraction to handle socket failure detection
192 on multiple platforms. NOT thread-safe.
193 """
195 if hasattr(select, 'kqueue'):
196 self.poller = select.kqueue()
197 self.add_fd = self.add_kqueue
198 self.remove_fd = self.remove_kqueue
199 self.error_iter = self.error_kqueue_iter
200 self.kevents = []
201 elif hasattr(select, 'epoll'):
202 self.poller = select.epoll()
203 self.add_fd = self.add_epoll
204 self.remove_fd = self.remove_epoll
205 self.error_iter = self.error_epoll_iter
206 elif hasattr(select, 'poll'):
207 self.poller = select.poll()
208 self.add_fd = self.add_poll
209 self.remove_fd = self.remove_poll
210 self.error_iter = self.error_poll_iter
211 else:
212
213 self.poller = self.noop
214 self.add_fd = self.noop
215 self.remove_fd = self.noop
216 self.error_iter = self.noop_iter
217
218 - def noop(self, *args):
220
222 empty_generator = (x for x in list())
223 for x in empty_generator:
224 yield x
225
228
231
233 events = self.poller.poll(0)
234 for fd, event in events:
235 if event & (select.POLLHUP | select.POLLERR):
236 yield fd
237
239 self.poller.register(fd, select.EPOLLHUP|select.EPOLLERR|select.EPOLLRDHUP)
240
243
245 events = self.poller.poll(0)
246 for fd, event in events:
247 if event & (select.EPOLLHUP | select.EPOLLERR | select.EPOLLRDHUP):
248 yield fd
249
251 self.kevents.append(select.kevent(fd))
252
254 events = self.poller.control(self.kevents, 0)
255 for event in events:
256 if event & (select.KQ_EV_ERROR | select.KQ_EV_EOF):
257 yield event.ident
258
260 e = [x for x in self.kevents if x.ident == fd]
261 for x in e:
262 self.kevents.remove(x)
263
265 """
266 Base class of internal topic implementations. Each topic has a
267 singleton _TopicImpl implementation for managing the underlying
268 connections.
269 """
270
272 """
273 Base constructor
274 @param name: graph resource name of topic, e.g. 'laser'.
275 @type name: str
276 @param data_class: message data class
277 @type data_class: L{Message}
278 """
279
280
281 self.resolved_name = resolve_name(name)
282 self.name = self.resolved_name
283
284 self.data_class = data_class
285 self.type = data_class._type
286 self.handler = None
287 self.seq = 0
288
289
290
291
292
293
294
295 self.c_lock = threading.RLock()
296 self.connections = []
297 self.closed = False
298
299 self.ref_count = 0
300
301 self.connection_poll = Poller()
302
304
305 if self.closed:
306 return
307 if self.connections is not None:
308 for c in self.connections:
309 try:
310 c.close()
311 except:
312 pass
313 del self.connections[:]
314 self.c_lock = self.connections = self.handler = self.data_class = self.type = None
315
317 """close I/O"""
318 if self.closed:
319 return
320 self.closed = True
321 if self.c_lock is not None:
322 with self.c_lock:
323 for c in self.connections:
324 try:
325 if c is not None:
326 c.close()
327 except:
328
329 _logger.error(traceback.format_exc())
330 del self.connections[:]
331 self.handler = None
332
334 with self.c_lock:
335 return len(self.connections)
336
338 """
339 Query whether or not a connection with the associated \a
340 endpoint has been added to this object.
341 @param endpoint_id: endpoint ID associated with connection.
342 @type endpoint_id: str
343 """
344
345 conn = self.connections
346 for c in conn:
347 if c.endpoint_id == endpoint_id:
348 return True
349 return False
350
352 """
353 Check to see if this topic is connected to other publishers/subscribers
354 @return: True if topic is connected
355 @rtype: bool
356 """
357 if self.connections:
358 return True
359 return False
360
362
363 try:
364 self.connection_poll.remove_fd(c.fileno())
365 except:
366 pass
367 try:
368
369
370 c.close()
371 except:
372 pass
373
374
375
376 if c in connections:
377 connections.remove(c)
378
379 elif c.fileno():
380 matching_connections = [
381 conn for conn in connections if conn.fileno() == c.fileno()]
382 if len(matching_connections) == 1:
383 connections.remove(matching_connections[0])
384
386 """
387 Add a connection to this topic. If any previous connections
388 to same endpoint exist, drop them.
389
390 @param c: connection instance
391 @type c: Transport
392 @return: True if connection was added, ``bool``
393 """
394 rospyinfo("topic[%s] adding connection to [%s], count %s"%(self.resolved_name, c.endpoint_id, len(self.connections)))
395 with self.c_lock:
396
397 if self.closed:
398 rospyerr(
399 "ERROR: Race condition failure adding connection to closed subscriber\n"
400 "If you run into this please comment on "
401 "https://github.com/ros/ros_comm/issues/544"
402 )
403 return False
404
405
406
407
408 new_connections = self.connections[:]
409
410
411
412 for oldc in self.connections:
413 if oldc.endpoint_id == c.endpoint_id:
414 self._remove_connection(new_connections, oldc)
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430 for fd in self.connection_poll.error_iter():
431 to_remove = [x for x in new_connections if x.fileno() == fd]
432 for x in to_remove:
433 rospydebug("removing connection to %s, connection error detected"%(x.endpoint_id))
434 self._remove_connection(new_connections, x)
435
436
437
438 new_fd = c.fileno()
439 if new_fd is not None:
440 self.connection_poll.add_fd(new_fd)
441
442
443 new_connections.append(c)
444
445 self.connections = new_connections
446
447
448
449 if not c.cleanup_cb:
450 c.set_cleanup_callback(self.remove_connection)
451 else:
452 previous_callback = c.cleanup_cb
453 new_callback = self.remove_connection
454 def cleanup_cb_wrapper(s):
455 new_callback(s)
456 previous_callback(s)
457 c.set_cleanup_callback(cleanup_cb_wrapper)
458
459 return True
460
462 fds_to_remove = list(self.connection_poll.error_iter())
463 if fds_to_remove:
464 with self.c_lock:
465 new_connections = self.connections[:]
466 to_remove = [x for x in new_connections if x.fileno() in fds_to_remove]
467 for x in to_remove:
468 rospydebug("removing connection to %s, connection error detected"%(x.endpoint_id))
469 self._remove_connection(new_connections, x)
470 self.connections = new_connections
471
473 """
474 Remove connection from topic.
475 @param c: connection instance to remove
476 @type c: Transport
477 """
478 rospyinfo("topic[%s] removing connection to %s"%(self.resolved_name, c.endpoint_id))
479 with self.c_lock:
480
481
482
483 new_connections = self.connections[:]
484 self._remove_connection(new_connections, c)
485 self.connections = new_connections
486
488 """
489 Get the stats for this topic
490 @return: stats for topic in getBusInfo() format::
491 Publisher:
492 ((connection_id, destination_caller_id, direction, transport, topic_name, connected, connection_info_string)*)
493 Subscriber:
494 ((connection_id, publisher_xmlrpc_uri, direction, transport, topic_name, connected, connection_info_string)*)
495 @rtype: list
496 """
497
498 connections = self.connections
499 return [(c.id, c.endpoint_id, c.direction, c.transport_type, self.resolved_name, True, c.get_transport_info()) for c in connections]
500
502 """Get the stats for this topic (API stub)"""
503 raise Exception("subclasses must override")
504
505
506
507
508
510 """
511 Class for registering as a subscriber to a specified topic, where
512 the messages are of a given type.
513 """
514 - def __init__(self, name, data_class, callback=None, callback_args=None,
515 queue_size=None, buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False):
516 """
517 Constructor.
518
519 NOTE: for the queue_size and buff_size
520 parameters, rospy does not attempt to do intelligent merging
521 between multiple Subscriber instances for the same topic. As
522 they share the same underlying transport, multiple Subscribers
523 to the same topic can conflict with one another if they set
524 these parameters differently.
525
526 @param name: graph resource name of topic, e.g. 'laser'.
527 @type name: str
528 @param data_class: data type class to use for messages,
529 e.g. std_msgs.msg.String
530 @type data_class: L{Message} class
531 @param callback: function to call ( fn(data)) when data is
532 received. If callback_args is set, the function must accept
533 the callback_args as a second argument, i.e. fn(data,
534 callback_args). NOTE: Additional callbacks can be added using
535 add_callback().
536 @type callback: fn(msg, cb_args)
537 @param callback_args: additional arguments to pass to the
538 callback. This is useful when you wish to reuse the same
539 callback for multiple subscriptions.
540 @type callback_args: any
541 @param queue_size: maximum number of messages to receive at
542 a time. This will generally be 1 or None (infinite,
543 default). buff_size should be increased if this parameter
544 is set as incoming data still needs to sit in the incoming
545 buffer before being discarded. Setting queue_size
546 buff_size to a non-default value affects all subscribers to
547 this topic in this process.
548 @type queue_size: int
549 @param buff_size: incoming message buffer size in bytes. If
550 queue_size is set, this should be set to a number greater
551 than the queue_size times the average message size. Setting
552 buff_size to a non-default value affects all subscribers to
553 this topic in this process.
554 @type buff_size: int
555 @param tcp_nodelay: if True, request TCP_NODELAY from
556 publisher. Use of this option is not generally recommended
557 in most cases as it is better to rely on timestamps in
558 message data. Setting tcp_nodelay to True enables TCP_NODELAY
559 for all subscribers in the same python process.
560 @type tcp_nodelay: bool
561 @raise ROSException: if parameters are invalid
562 """
563 super(Subscriber, self).__init__(name, data_class, Registration.SUB)
564
565
566
567
568 if queue_size is not None:
569 self.impl.set_queue_size(queue_size)
570 if buff_size != DEFAULT_BUFF_SIZE:
571 self.impl.set_buff_size(buff_size)
572
573 if callback is not None:
574
575
576
577 self.impl.add_callback(callback, callback_args)
578
579 self.callback = callback
580 self.callback_args = callback_args
581 else:
582
583 self.callback = self.callback_args = None
584 if tcp_nodelay:
585 self.impl.set_tcp_nodelay(tcp_nodelay)
586
588 """
589 unpublish/unsubscribe from topic. Topic instance is no longer
590 valid after this call. Additional calls to unregister() have no effect.
591 """
592 if self.impl:
593
594
595 if self.callback is not None:
596 self.impl.remove_callback(self.callback, self.callback_args)
597 self.callback = self.callback_args = None
598 super(Subscriber, self).unregister()
599
601 """
602 Underlying L{_TopicImpl} implementation for subscriptions.
603 """
605 """
606 ctor.
607 @param name: graph resource name of topic, e.g. 'laser'.
608 @type name: str
609 @param data_class: Message data class
610 @type data_class: L{Message} class
611 """
612 super(_SubscriberImpl, self).__init__(name, data_class)
613
614
615
616 self.callbacks = []
617 self.queue_size = None
618 self.buff_size = DEFAULT_BUFF_SIZE
619 self.tcp_nodelay = False
620 self.statistics_logger = SubscriberStatisticsLogger(self) \
621 if SubscriberStatisticsLogger.is_enabled() \
622 else None
623
625 """close I/O and release resources"""
626 _TopicImpl.close(self)
627 if self.callbacks:
628 del self.callbacks[:]
629 self.callbacks = None
630 if self.statistics_logger:
631 self.statistics_logger.shutdown()
632 self.statistics_logger = None
633
635 """
636 Set the value of TCP_NODELAY, which causes the Nagle algorithm
637 to be disabled for future topic connections, if the publisher
638 supports it.
639 """
640 self.tcp_nodelay = tcp_nodelay
641
643 """
644 Set the receive queue size. If more than queue_size messages
645 are waiting to be deserialized, they are discarded.
646
647 @param queue_size int: incoming queue size. Must be positive integer or None.
648 @type queue_size: int
649 """
650 if queue_size == -1:
651 self.queue_size = None
652 elif queue_size == 0:
653 raise ROSException("queue size may not be set to zero")
654 elif queue_size is not None and type(queue_size) != int:
655 raise ROSException("queue size must be an integer")
656 else:
657 self.queue_size = queue_size
658
660 """
661 Set the receive buffer size. The exact meaning of this is
662 transport dependent.
663 @param buff_size: receive buffer size
664 @type buff_size: int
665 """
666 if type(buff_size) != int:
667 raise ROSException("buffer size must be an integer")
668 elif buff_size <= 0:
669 raise ROSException("buffer size must be a positive integer")
670 self.buff_size = buff_size
671
673 """
674 Get the stats for this topic subscriber
675 @return: stats for topic in getBusStats() publisher format::
676 (topicName, connStats)
677 where connStats is::
678 [connectionId, bytesReceived, numSent, dropEstimate, connected]*
679 @rtype: list
680 """
681
682 conn = self.connections
683
684 stats = (self.resolved_name,
685 [(c.id, c.stat_bytes, c.stat_num_msg, -1, not c.done)
686 for c in conn] )
687 return stats
688
690 """
691 Register a callback to be invoked whenever a new message is received
692 @param cb: callback function to invoke with message data
693 instance, i.e. fn(data). If callback args is set, they will
694 be passed in as the second argument.
695 @type cb: fn(msg, cb_args)
696 @param cb_cargs: additional arguments to pass to callback
697 @type cb_cargs: Any
698 """
699 if self.closed:
700 raise ROSException("subscriber [%s] has been closed"%(self.resolved_name))
701 with self.c_lock:
702
703
704 new_callbacks = self.callbacks[:]
705 new_callbacks.append((cb, cb_args))
706 self.callbacks = new_callbacks
707
708
709 for c in self.connections:
710 if c.latch is not None:
711 self._invoke_callback(c.latch, cb, cb_args)
712
714 """
715 Unregister a message callback.
716 @param cb: callback function
717 @type cb: fn(msg, cb_args)
718 @param cb_cargs: additional arguments associated with callback
719 @type cb_cargs: Any
720 @raise KeyError: if no matching callback
721 """
722 if self.closed:
723 return
724 with self.c_lock:
725
726
727 matches = [x for x in self.callbacks if x[0] == cb and x[1] == cb_args]
728 if matches:
729 new_callbacks = self.callbacks[:]
730
731 new_callbacks.remove(matches[0])
732 self.callbacks = new_callbacks
733 if not matches:
734 raise KeyError("no matching cb")
735
737 """
738 Invoke callback on msg. Traps and logs any exceptions raise by callback
739 @param msg: message data
740 @type msg: L{Message}
741 @param cb: callback
742 @type cb: fn(msg, cb_args)
743 @param cb_args: callback args or None
744 @type cb_args: Any
745 """
746 try:
747 if cb_args is not None:
748 cb(msg, cb_args)
749 else:
750 cb(msg)
751 except Exception as e:
752 if not is_shutdown():
753 logerr("bad callback: %s\n%s"%(cb, traceback.format_exc()))
754 else:
755 _logger.warn("during shutdown, bad callback: %s\n%s"%(cb, traceback.format_exc()))
756
758 """
759 Called by underlying connection transport for each new message received
760 @param msgs: message data
761 @type msgs: [L{Message}]
762 """
763
764 callbacks = self.callbacks
765 for msg in msgs:
766 if self.statistics_logger:
767 self.statistics_logger.callback(msg, connection.callerid_pub, connection.stat_bytes)
768 for cb, cb_args in callbacks:
769 self._invoke_callback(msg, cb, cb_args)
770
772 """
773 Callback API to receive notifications when new subscribers
774 connect and disconnect.
775 """
776
778 """
779 callback when a peer has subscribed from a topic
780 @param topic_name: topic name. NOTE: topic name will be resolved/remapped
781 @type topic_name: str
782 @param topic_publish: method to publish message data to all subscribers
783 @type topic_publish: fn(data)
784 @param peer_publish: method to publish message data to
785 new subscriber. NOTE: behavior for the latter is
786 transport-dependent as some transports may be broadcast only.
787 @type peer_publish: fn(data)
788 """
789 pass
790
792 """
793 callback when a peer has unsubscribed from a topic
794 @param topic_name: topic name. NOTE: topic name will be resolved/remapped
795 @type topic_name: str
796 @param num_peers: number of remaining peers subscribed to topic
797 @type num_peers: int
798 """
799 pass
800
801
802
803
804
805
806
808 """
809 Class for registering as a publisher of a ROS topic.
810 """
811
812 - def __init__(self, name, data_class, subscriber_listener=None, tcp_nodelay=False, latch=False, headers=None, queue_size=None):
813 """
814 Constructor
815 @param name: resource name of topic, e.g. 'laser'.
816 @type name: str
817 @param data_class: message class for serialization
818 @type data_class: L{Message} class
819 @param subscriber_listener: listener for
820 subscription events. May be None.
821 @type subscriber_listener: L{SubscribeListener}
822 @param tcp_nodelay: If True, sets TCP_NODELAY on
823 publisher's socket (disables Nagle algorithm). This results
824 in lower latency publishing at the cost of efficiency.
825 @type tcp_nodelay: bool
826 @param latch: If True, the last message published is
827 'latched', meaning that any future subscribers will be sent
828 that message immediately upon connection.
829 @type latch: bool
830 @param headers: If not None, a dictionary with additional header
831 key-values being used for future connections.
832 @type headers: dict
833 @param queue_size: The queue size used for asynchronously
834 publishing messages from different threads. A size of zero
835 means an infinite queue, which can be dangerous. When the
836 keyword is not being used or when None is passed all
837 publishing will happen synchronously and a warning message
838 will be printed.
839 @type queue_size: int
840 @raise ROSException: if parameters are invalid
841 """
842 super(Publisher, self).__init__(name, data_class, Registration.PUB)
843
844 if subscriber_listener:
845 self.impl.add_subscriber_listener(subscriber_listener)
846 if tcp_nodelay:
847 get_tcpros_handler().set_tcp_nodelay(self.resolved_name, tcp_nodelay)
848 if latch:
849 self.impl.enable_latch()
850 if headers:
851 self.impl.add_headers(headers)
852 if queue_size is not None:
853 self.impl.set_queue_size(queue_size)
854 else:
855 import warnings
856 warnings.warn("The publisher should be created with an explicit keyword argument 'queue_size'. "
857 "Please see http://wiki.ros.org/rospy/Overview/Publishers%20and%20Subscribers for more information.", SyntaxWarning, stacklevel=2)
858
860 """
861 Publish message data object to this topic.
862 Publish can either be called with the message instance to
863 publish or with the constructor args for a new Message
864 instance, i.e.::
865 pub.publish(message_instance)
866 pub.publish(message_field_1, message_field_2...)
867 pub.publish(message_field_1='foo', message_field_2='bar')
868
869 @param args : L{Message} instance, message arguments, or no args if keyword arguments are used
870 @param kwds : Message keyword arguments. If kwds are used, args must be unset
871 @raise ROSException: If rospy node has not been initialized
872 @raise ROSSerializationException: If unable to serialize
873 message. This is usually a type error with one of the fields.
874 """
875 if self.impl is None:
876 raise ROSException("publish() to an unregistered() handle")
877 if not is_initialized():
878 raise ROSException("ROS node has not been initialized yet. Please call init_node() first")
879 data = args_kwds_to_message(self.data_class, args, kwds)
880 try:
881 self.impl.acquire()
882 self.impl.publish(data)
883 except genpy.SerializationError as e:
884
885 _logger.error(traceback.format_exc())
886 raise ROSSerializationException(str(e))
887 finally:
888 self.impl.release()
889
891 """
892 Underlying L{_TopicImpl} implementation for publishers.
893 """
894
896 """
897 @param name: name of topic, e.g. 'laser'.
898 @type name: str
899 @param data_class: Message data class
900 @type data_class: L{Message} class
901 """
902 super(_PublisherImpl, self).__init__(name, data_class)
903 if python3 == 0:
904 self.buff = StringIO()
905 else:
906 self.buff = BytesIO()
907 self.publock = threading.RLock()
908 self.subscriber_listeners = []
909
910
911 self.headers = {}
912
913
914 self.is_latch = False
915 self.latch = None
916
917
918 self.queue_size = None
919
920
921 self.message_data_sent = 0
922
924 """close I/O and release resources"""
925 _TopicImpl.close(self)
926
927 if self.subscriber_listeners:
928 del self.subscriber_listeners[:]
929 if self.headers:
930 self.headers.clear()
931 if self.buff is not None:
932 self.buff.close()
933 self.publock = self.headers = self.buff = self.subscriber_listeners = None
934
936 """
937 Add connection headers to this Topic for future connections.
938 @param headers: key/values will be added to current connection
939 header set, overriding any existing keys if they conflict.
940 @type headers: dict
941 """
942 self.headers.update(headers)
943
945 """
946 Enable publish() latch. The latch contains the last published
947 message and is sent to any new subscribers.
948 """
949 self.is_latch = True
950
952 self.queue_size = queue_size
953
955 """
956 Get the stats for this topic publisher
957 @return: stats for topic in getBusStats() publisher format::
958 [topicName, messageDataBytes, connStats],
959 where connStats is::
960 [id, bytes, numMessages, connected]*
961 @rtype: list
962 """
963
964 conn = self.connections
965 return (self.resolved_name, self.message_data_sent,
966 [(c.id, c.stat_bytes, c.stat_num_msg, not c.done) for c in conn] )
967
969 """
970 Add a L{SubscribeListener} for subscribe events.
971 @param l: listener instance
972 @type l: L{SubscribeListener}
973 """
974 self.subscriber_listeners.append(l)
975
977 """lock for thread-safe publishing to this transport"""
978 if self.publock is not None:
979 self.publock.acquire()
980
982 """lock for thread-safe publishing to this transport"""
983 if self.publock is not None:
984 self.publock.release()
985
987 """
988 Add a connection to this topic. This must be a PubTransport. If
989 the latch is enabled, c will be sent a the value of the
990 latch.
991 @param c: connection instance
992 @type c: L{Transport}
993 @return: True if connection was added
994 @rtype: bool
995 """
996 if self.queue_size is not None:
997 c = QueuedConnection(c, self.queue_size)
998 super(_PublisherImpl, self).add_connection(c)
999 def publish_single(data):
1000 self.publish(data, connection_override=c)
1001 for l in self.subscriber_listeners:
1002 l.peer_subscribe(self.resolved_name, self.publish, publish_single)
1003 if self.is_latch and self.latch is not None:
1004 with self.publock:
1005 self.publish(self.latch, connection_override=c)
1006 return True
1007
1009 """
1010 Remove existing connection from this topic.
1011 @param c: connection instance to remove
1012 @type c: L{Transport}
1013 """
1014 super(_PublisherImpl, self).remove_connection(c)
1015 num = len(self.connections)
1016 for l in self.subscriber_listeners:
1017 l.peer_unsubscribe(self.resolved_name, num)
1018
1019 - def publish(self, message, connection_override=None):
1020 """
1021 Publish the data to the topic. If the topic has no subscribers,
1022 the method will return without any affect. Access to publish()
1023 should be locked using acquire() and release() in order to
1024 ensure proper message publish ordering.
1025
1026 @param message: message data instance to publish
1027 @type message: L{Message}
1028 @param connection_override: publish to this connection instead of all
1029 @type connection_override: L{Transport}
1030 @return: True if the data was published, False otherwise.
1031 @rtype: bool
1032 @raise genpy.SerializationError: if L{Message} instance is unable to serialize itself
1033 @raise rospy.ROSException: if topic has been closed or was closed during publish()
1034 """
1035
1036
1037 if self.closed:
1038
1039
1040 if not is_shutdown():
1041 raise ROSException("publish() to a closed topic")
1042 else:
1043 return
1044
1045 if self.is_latch:
1046 self.latch = message
1047
1048 if not self.has_connections():
1049
1050 return False
1051
1052 if connection_override is None:
1053
1054 conns = self.connections
1055 else:
1056 conns = [connection_override]
1057
1058
1059
1060 b = self.buff
1061 try:
1062 b.tell()
1063
1064
1065 self.seq += 1
1066 serialize_message(b, self.seq, message)
1067
1068
1069 err_con = []
1070 data = b.getvalue()
1071
1072 for c in conns:
1073 try:
1074 if not is_shutdown():
1075 c.write_data(data)
1076 except TransportTerminated as e:
1077 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc()))
1078 err_con.append(c)
1079 except Exception as e:
1080
1081 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc()))
1082 err_con.append(c)
1083
1084
1085 self.message_data_sent += b.tell()
1086 b.seek(0)
1087 b.truncate(0)
1088
1089 except ValueError:
1090
1091
1092
1093 if self.closed:
1094 if is_shutdown():
1095
1096
1097 return
1098 else:
1099
1100
1101 raise ROSException("topic was closed during publish()")
1102 else:
1103
1104 raise
1105
1106
1107 for c in err_con:
1108 try:
1109
1110
1111 c.close()
1112 except:
1113 pass
1114
1115
1116
1117
1119 """
1120 Tracks Topic objects
1121 See L{get_topic_manager()} for singleton access
1122 """
1123
1125 """ctor."""
1126 super(_TopicManager, self).__init__()
1127 self.pubs = {}
1128 self.subs = {}
1129 self.topics = set()
1130 self.lock = threading.Condition()
1131 self.closed = False
1132 _logger.info("topicmanager initialized")
1133
1135 """
1136 get topic publisher and subscriber connection info for getBusInfo() api
1137 @return: [bus info stats]
1138 See getBusInfo() API for more data structure details.
1139 @rtype: list
1140 """
1141 with self.lock:
1142 info = []
1143 for s in chain(iter(self.pubs.values()), iter(self.subs.values())):
1144 info.extend(s.get_stats_info())
1145 return info
1146
1148 """
1149 get topic publisher and subscriber stats for getBusStats() api
1150 @return: [publisherStats, subscriberStats].
1151 See getBusStats() API for more data structure details.
1152 @rtype: list
1153 """
1154 with self.lock:
1155 return [s.get_stats() for s in self.pubs.values()],\
1156 [s.get_stats() for s in self.subs.values()]
1157
1159 """
1160 Close all registered publication and subscriptions. Manager is
1161 no longer usable after close.
1162 """
1163 self.closed = True
1164 with self.lock:
1165 for t in chain(iter(self.pubs.values()), iter(self.subs.values())):
1166 t.close()
1167 self.pubs.clear()
1168 self.subs.clear()
1169
1170
1172 """
1173 Check all registered publication and subscriptions.
1174 """
1175 with self.lock:
1176 for t in chain(list(self.pubs.values()), list(self.subs.values())):
1177 t.check()
1178
1179 - def _add(self, ps, rmap, reg_type):
1180 """
1181 Add L{_TopicImpl} instance to rmap
1182 @param ps: a pub/sub impl instance
1183 @type ps: L{_TopicImpl}
1184 @param rmap: { topic: _TopicImpl} rmap to record instance in
1185 @type rmap: dict
1186 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1187 @type reg_type: str
1188 """
1189 resolved_name = ps.resolved_name
1190 _logger.debug("tm._add: %s, %s, %s", resolved_name, ps.type, reg_type)
1191 with self.lock:
1192 rmap[resolved_name] = ps
1193 self.topics.add(resolved_name)
1194
1195
1196
1197 get_registration_listeners().notify_added(resolved_name, ps.type, reg_type)
1198
1200 """recalculate self.topics. expensive"""
1201 self.topics = set([x.resolved_name for x in self.pubs.values()] +
1202 [x.resolved_name for x in self.subs.values()])
1203
1204 - def _remove(self, ps, rmap, reg_type):
1205 """
1206 Remove L{_TopicImpl} instance from rmap
1207 @param ps: a pub/sub impl instance
1208 @type ps: L{_TopicImpl}
1209 @param rmap: topic->_TopicImpl rmap to remove instance in
1210 @type rmap: dict
1211 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1212 @type reg_type: str
1213 """
1214 resolved_name = ps.resolved_name
1215 _logger.debug("tm._remove: %s, %s, %s", resolved_name, ps.type, reg_type)
1216 with self.lock:
1217 del rmap[resolved_name]
1218 self. _recalculate_topics()
1219
1220
1221
1222 get_registration_listeners().notify_removed(resolved_name, ps.type, reg_type)
1223
1224 - def get_impl(self, reg_type, resolved_name):
1225 """
1226 Get the L{_TopicImpl} for the specified topic. This is mainly for
1227 testing purposes. Unlike acquire_impl, it does not alter the
1228 ref count.
1229 @param resolved_name: resolved topic name
1230 @type resolved_name: str
1231 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1232 @type reg_type: str
1233 """
1234 if reg_type == Registration.PUB:
1235 rmap = self.pubs
1236 elif reg_type == Registration.SUB:
1237 rmap = self.subs
1238 else:
1239 raise TypeError("invalid reg_type: %s"%s)
1240 return rmap.get(resolved_name, None)
1241
1242 - def acquire_impl(self, reg_type, resolved_name, data_class):
1243 """
1244 Acquire a L{_TopicImpl} for the specified topic (create one if it
1245 doesn't exist). Every L{Topic} instance has a _TopicImpl that
1246 actually controls the topic resources so that multiple Topic
1247 instances use the same underlying connections. 'Acquiring' a
1248 topic implementation marks that another Topic instance is
1249 using the TopicImpl.
1250
1251 @param resolved_name: resolved topic name
1252 @type resolved_name: str
1253
1254 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1255 @type reg_type: str
1256
1257 @param data_class: message class for topic
1258 @type data_class: L{Message} class
1259 """
1260 if reg_type == Registration.PUB:
1261 rmap = self.pubs
1262 impl_class = _PublisherImpl
1263 elif reg_type == Registration.SUB:
1264 rmap = self.subs
1265 impl_class = _SubscriberImpl
1266 else:
1267 raise TypeError("invalid reg_type: %s"%reg_type)
1268 with self.lock:
1269 impl = rmap.get(resolved_name, None)
1270 if not impl:
1271 impl = impl_class(resolved_name, data_class)
1272 self._add(impl, rmap, reg_type)
1273 impl.ref_count += 1
1274 return impl
1275
1277 """
1278 Release a L_{TopicImpl} for the specified topic.
1279
1280 Every L{Topic} instance has a _TopicImpl that actually
1281 controls the topic resources so that multiple Topic instances
1282 use the same underlying connections. 'Acquiring' a topic
1283 implementation marks that another Topic instance is using the
1284 TopicImpl.
1285
1286 @param resolved_name: resolved topic name
1287 @type resolved_name: str
1288 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1289 @type reg_type: str
1290 """
1291 if reg_type == Registration.PUB:
1292 rmap = self.pubs
1293 else:
1294 rmap = self.subs
1295 with self.lock:
1296
1297 if self.closed:
1298 return
1299 impl = rmap.get(resolved_name, None)
1300 assert impl is not None, "cannot release topic impl as impl [%s] does not exist"%resolved_name
1301 impl.ref_count -= 1
1302 assert impl.ref_count >= 0, "topic impl's reference count has gone below zero"
1303 if impl.ref_count == 0:
1304 rospyinfo("topic impl's ref count is zero, deleting topic %s...", resolved_name)
1305 impl.close()
1306 self._remove(impl, rmap, reg_type)
1307 del impl
1308 _logger.debug("... done deleting topic %s", resolved_name)
1309
1311 """
1312 @param resolved_name: resolved topic name
1313 @type resolved_name: str
1314 @return: list of L{_PublisherImpl}s
1315 @rtype: [L{_PublisherImpl}]
1316 """
1317 return self.pubs.get(resolved_name, None)
1318
1320 """
1321 @param resolved_name: topic name
1322 @type resolved_name: str
1323 @return: subscriber for the specified topic.
1324 @rtype: L{_SubscriberImpl}
1325 """
1326 return self.subs.get(resolved_name, None)
1327
1329 """
1330 @param resolved_name: resolved topic name
1331 @type resolved_name: str
1332 @return: True if manager has subscription for specified topic
1333 @rtype: bool
1334 """
1335 return resolved_name in self.subs
1336
1338 """
1339 @param resolved_name: resolved topic name
1340 @type resolved_name: str
1341 @return: True if manager has publication for specified topic
1342 @rtype: bool
1343 """
1344 return resolved_name in self.pubs
1345
1347 """
1348 @return: list of topic names this node subscribes to/publishes
1349 @rtype: [str]
1350 """
1351 return self.topics.copy()
1352
1354 return [[k, v.type] for k, v in rmap.items()]
1355
1356
1358 return self._get_list(self.subs)
1359
1360
1362 return self._get_list(self.pubs)
1363
1364 set_topic_manager(_TopicManager())
1365