1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 rospy implementation of topics.
37
38 Client API
39 ==========
40
41 L{Publisher} and L{Subscriber} are the client API for topics.
42
43 Internal Implementation
44 =======================
45
46 Internally, L{_TopicImpl} instances (_PublisherImpl/_SubscriberImpl)
47 are used to manage actual transport connections. The L{_TopicManager}
48 is responsible for tracking the system-wide state of publications and
49 subscriptions as well as the L{_TopicImpl} instances. More info is below.
50
51 L{_TopicManager}
52 ================
53
54 The L{_TopicManager} does the backend topic bookkeeping for the local
55 node. Use L{get_topic_manager()} to access singleton. Actual topic
56 implementations are done through the
57 L{_TopicImpl}/L{_PublisherImpl}/L{_SubscriberImpl} hierarchy. Client
58 code generates instances of type L{Publisher}/L{Subscriber}, which
59 enable to client to create multiple publishers/subscribers of that
60 topic that get controlled access to the underlying share connections.
61
62 Common parent classes for all rospy topics. The rospy topic autogenerators
63 create classes that are children of these implementations.
64 """
65
66
67 import struct
68 import select
69 try:
70 from cStringIO import StringIO
71 import thread as _thread
72 python3 = 0
74 return isinstance(s, basestring)
75 except ImportError:
76 python3 = 1
77 from io import StringIO, BytesIO
78 import _thread
80 return isinstance(s, str)
81
82 import threading
83 import logging
84 import time
85
86 from itertools import chain
87 import traceback
88
89 import rosgraph.names
90
91 from rospy.core import *
92 from rospy.exceptions import ROSSerializationException, TransportTerminated
93 from rospy.msg import serialize_message, args_kwds_to_message
94
95 from rospy.impl.registration import get_topic_manager, set_topic_manager, Registration, get_registration_listeners
96 from rospy.impl.tcpros import get_tcpros_handler, DEFAULT_BUFF_SIZE
97
98 _logger = logging.getLogger('rospy.topics')
99
100
101 import genpy
102 Message = genpy.Message
103
104
105
106
107
108
109
110
112 """Base class of L{Publisher} and L{Subscriber}"""
113
114 - def __init__(self, name, data_class, reg_type):
115 """
116 @param name: graph resource name of topic, e.g. 'laser'.
117 @type name: str
118 @param data_class: message class for serialization
119 @type data_class: L{Message}
120 @param reg_type Registration.PUB or Registration.SUB
121 @type reg_type: str
122 @raise ValueError: if parameters are invalid
123 """
124
125 if not name or not isstring(name):
126 raise ValueError("topic name is not a non-empty string")
127 try:
128 if python3 == 1:
129 name.encode("utf-8")
130 else:
131 name = name.encode("utf-8")
132 except UnicodeError:
133 raise ValueError("topic name must be ascii/utf-8 compatible")
134 if data_class is None:
135 raise ValueError("topic parameter 'data_class' is not initialized")
136 if not type(data_class) == type:
137 raise ValueError("data_class [%s] is not a class"%data_class)
138 if not issubclass(data_class, genpy.Message):
139 raise ValueError("data_class [%s] is not a message data class"%data_class.__class__.__name__)
140
141 if not rosgraph.names.is_legal_name(name):
142 import warnings
143 warnings.warn("'%s' is not a legal ROS graph resource name. This may cause problems with other ROS tools"%name, stacklevel=2)
144
145
146
147 if not rospy.core.is_initialized():
148 self.resolved_name = rospy.names.resolve_name_without_node_name(name)
149 else:
150
151 self.resolved_name = resolve_name(name)
152
153 self.name = self.resolved_name
154
155 self.data_class = data_class
156 self.type = data_class._type
157 self.md5sum = data_class._md5sum
158 self.reg_type = reg_type
159 self.impl = get_topic_manager().acquire_impl(reg_type, self.resolved_name, data_class)
160
162 """
163 get the number of connections to other ROS nodes for this topic. For a Publisher,
164 this corresponds to the number of nodes subscribing. For a Subscriber, the number
165 of publishers.
166 @return: number of connections
167 @rtype: int
168 """
169 return self.impl.get_num_connections()
170
172 """
173 unpublish/unsubscribe from topic. Topic instance is no longer
174 valid after this call. Additional calls to unregister() have no effect.
175 """
176
177
178 resolved_name = self.resolved_name
179 if resolved_name and self.impl:
180 get_topic_manager().release_impl(self.reg_type, resolved_name)
181 self.impl = self.resolved_name = self.type = self.md5sum = self.data_class = None
182
183
185 """
186 select.poll/kqueue abstraction to handle socket failure detection
187 on multiple platforms. NOT thread-safe.
188 """
190 try:
191 self.poller = select.poll()
192 self.add_fd = self.add_poll
193 self.remove_fd = self.remove_poll
194 self.error_iter = self.error_poll_iter
195 except:
196 try:
197
198 self.poller = select.kqueue()
199 self.add_fd = self.add_kqueue
200 self.remove_fd = self.remove_kqueue
201 self.error_iter = self.error_kqueue_iter
202 self.kevents = []
203 except:
204
205 self.poller = self.noop
206 self.add_fd = self.noop
207 self.remove_fd = self.noop
208 self.error_iter = self.noop
209
210 - def noop(self, *args):
212
215
218
220 events = self.poller.poll(0)
221 for fd, event in events:
222 if event & (select.POLLHUP | select.POLLERR):
223 yield fd
224
226 self.kevents.append(select.kevent(fd))
227
229 events = self.poller.control(self.kevents, 0)
230 for event in events:
231 if event & (select.KQ_EV_ERROR | select.KQ_EV_EOF):
232 yield event.ident
233
235 e = [x for x in self.kevents if x.ident == fd]
236 for x in e:
237 self.kevents.remove(x)
238
240 """
241 Base class of internal topic implementations. Each topic has a
242 singleton _TopicImpl implementation for managing the underlying
243 connections.
244 """
245
247 """
248 Base constructor
249 @param name: graph resource name of topic, e.g. 'laser'.
250 @type name: str
251 @param data_class: message data class
252 @type data_class: L{Message}
253 """
254
255
256 self.resolved_name = resolve_name(name)
257 self.name = self.resolved_name
258
259 self.data_class = data_class
260 self.type = data_class._type
261 self.handler = None
262 self.seq = 0
263
264
265
266
267
268
269
270 self.c_lock = threading.RLock()
271 self.connections = []
272 self.closed = False
273
274 self.ref_count = 0
275
276 self.connection_poll = Poller()
277
279
280 if self.closed:
281 return
282 if self.connections is not None:
283 for c in self.connections:
284 try:
285 c.close()
286 except:
287 pass
288 del self.connections[:]
289 self.c_lock = self.connections = self.handler = self.data_class = self.type = None
290
292 """close I/O"""
293 if self.closed:
294 return
295 self.closed = True
296 if self.c_lock is not None:
297 with self.c_lock:
298 for c in self.connections:
299 try:
300 if c is not None:
301 c.close()
302 except:
303
304 _logger.error(traceback.format_exc())
305 del self.connections[:]
306 self.handler = None
307
309 with self.c_lock:
310 return len(self.connections)
311
313 """
314 Query whether or not a connection with the associated \a
315 endpoint has been added to this object.
316 @param endpoint_id: endpoint ID associated with connection.
317 @type endpoint_id: str
318 """
319
320 conn = self.connections
321 for c in conn:
322 if c.endpoint_id == endpoint_id:
323 return True
324 return False
325
327 """
328 Check to see if this topic is connected to other publishers/subscribers
329 @return: True if topic is connected
330 @rtype: bool
331 """
332 if self.connections:
333 return True
334 return False
335
337
338 try:
339 self.connection_poll.remove_fd(c.fileno())
340 except:
341 pass
342 try:
343
344
345 c.close()
346 except:
347 pass
348 if c in connections:
349 connections.remove(c)
350
352 """
353 Add a connection to this topic. If any previous connections
354 to same endpoint exist, drop them.
355
356 @param c: connection instance
357 @type c: Transport
358 @return: True if connection was added, ``bool``
359 """
360 rospyinfo("topic[%s] adding connection to [%s], count %s"%(self.resolved_name, c.endpoint_id, len(self.connections)))
361 with self.c_lock:
362
363
364
365 new_connections = self.connections[:]
366
367
368
369 for oldc in self.connections:
370 if oldc.endpoint_id == c.endpoint_id:
371 self._remove_connection(new_connections, oldc)
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387 for fd in self.connection_poll.error_iter():
388 to_remove = [x for x in new_connections if x.fileno() == fd]
389 for x in to_remove:
390 rospydebug("removing connection to %s, connection error detected"%(x.endpoint_id))
391 self._remove_connection(new_connections, x)
392
393
394
395 new_fd = c.fileno()
396 if new_fd is not None:
397 self.connection_poll.add_fd(new_fd)
398
399
400 new_connections.append(c)
401
402 self.connections = new_connections
403
404
405 c.set_cleanup_callback(self.remove_connection)
406
407 return True
408
410 """
411 Remove connection from topic.
412 @param c: connection instance to remove
413 @type c: Transport
414 """
415 rospyinfo("topic[%s] removing connection to %s"%(self.resolved_name, c.endpoint_id))
416 with self.c_lock:
417
418
419
420 new_connections = self.connections[:]
421 self._remove_connection(new_connections, c)
422 self.connections = new_connections
423
425 """
426 Get the stats for this topic
427 @return: stats for topic in getBusInfo() format::
428 ((connection_id, destination_caller_id, direction, transport, topic_name, connected)*)
429 @rtype: list
430 """
431
432 connections = self.connections
433 return [(c.id, c.endpoint_id, c.direction, c.transport_type, self.resolved_name, True) for c in connections]
434
436 """Get the stats for this topic (API stub)"""
437 raise Exception("subclasses must override")
438
439
440
441
442
444 """
445 Class for registering as a subscriber to a specified topic, where
446 the messages are of a given type.
447 """
448 - def __init__(self, name, data_class, callback=None, callback_args=None,
449 queue_size=None, buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False):
450 """
451 Constructor.
452
453 NOTE: for the queue_size and buff_size
454 parameters, rospy does not attempt to do intelligent merging
455 between multiple Subscriber instances for the same topic. As
456 they share the same underlying transport, multiple Subscribers
457 to the same topic can conflict with one another if they set
458 these parameters differently.
459
460 @param name: graph resource name of topic, e.g. 'laser'.
461 @type name: str
462 @param data_class: data type class to use for messages,
463 e.g. std_msgs.msg.String
464 @type data_class: L{Message} class
465 @param callback: function to call ( fn(data)) when data is
466 received. If callback_args is set, the function must accept
467 the callback_args as a second argument, i.e. fn(data,
468 callback_args). NOTE: Additional callbacks can be added using
469 add_callback().
470 @type callback: str
471 @param callback_args: additional arguments to pass to the
472 callback. This is useful when you wish to reuse the same
473 callback for multiple subscriptions.
474 @type callback_args: any
475 @param queue_size: maximum number of messages to receive at
476 a time. This will generally be 1 or None (infinite,
477 default). buff_size should be increased if this parameter
478 is set as incoming data still needs to sit in the incoming
479 buffer before being discarded. Setting queue_size
480 buff_size to a non-default value affects all subscribers to
481 this topic in this process.
482 @type queue_size: int
483 @param buff_size: incoming message buffer size in bytes. If
484 queue_size is set, this should be set to a number greater
485 than the queue_size times the average message size. Setting
486 buff_size to a non-default value affects all subscribers to
487 this topic in this process.
488 @type buff_size: int
489 @param tcp_nodelay: if True, request TCP_NODELAY from
490 publisher. Use of this option is not generally recommended
491 in most cases as it is better to rely on timestamps in
492 message data. Setting tcp_nodelay to True enables TCP_NODELAY
493 for all subscribers in the same python process.
494 @type tcp_nodelay: bool
495 @raise ROSException: if parameters are invalid
496 """
497 super(Subscriber, self).__init__(name, data_class, Registration.SUB)
498
499
500
501
502 if queue_size is not None:
503 self.impl.set_queue_size(queue_size)
504 if buff_size != DEFAULT_BUFF_SIZE:
505 self.impl.set_buff_size(buff_size)
506
507 if callback is not None:
508
509
510
511 self.impl.add_callback(callback, callback_args)
512
513 self.callback = callback
514 self.callback_args = callback_args
515 else:
516
517 self.callback = self.callback_args = None
518 if tcp_nodelay:
519 self.impl.set_tcp_nodelay(tcp_nodelay)
520
522 """
523 unpublish/unsubscribe from topic. Topic instance is no longer
524 valid after this call. Additional calls to unregister() have no effect.
525 """
526 if self.impl:
527
528
529 if self.callback is not None:
530 self.impl.remove_callback(self.callback, self.callback_args)
531 self.callback = self.callback_args = None
532 super(Subscriber, self).unregister()
533
535 """
536 Underyling L{_TopicImpl} implementation for subscriptions.
537 """
539 """
540 ctor.
541 @param name: graph resource name of topic, e.g. 'laser'.
542 @type name: str
543 @param data_class: Message data class
544 @type data_class: L{Message} class
545 """
546 super(_SubscriberImpl, self).__init__(name, data_class)
547
548
549
550 self.callbacks = []
551 self.queue_size = None
552 self.buff_size = DEFAULT_BUFF_SIZE
553 self.tcp_nodelay = False
554
556 """close I/O and release resources"""
557 _TopicImpl.close(self)
558 if self.callbacks:
559 del self.callbacks[:]
560 self.callbacks = None
561
563 """
564 Set the value of TCP_NODELAY, which causes the Nagle algorithm
565 to be disabled for future topic connections, if the publisher
566 supports it.
567 """
568 self.tcp_nodelay = tcp_nodelay
569
571 """
572 Set the receive queue size. If more than queue_size messages
573 are waiting to be deserialized, they are discarded.
574
575 @param queue_size int: incoming queue size. Must be positive integer or None.
576 @type queue_size: int
577 """
578 if queue_size == -1:
579 self.queue_size = None
580 elif queue_size == 0:
581 raise ROSException("queue size may not be set to zero")
582 elif queue_size is not None and type(queue_size) != int:
583 raise ROSException("queue size must be an integer")
584 else:
585 self.queue_size = queue_size
586
588 """
589 Set the receive buffer size. The exact meaning of this is
590 transport dependent.
591 @param buff_size: receive buffer size
592 @type buff_size: int
593 """
594 if type(buff_size) != int:
595 raise ROSException("buffer size must be an integer")
596 elif buff_size <= 0:
597 raise ROSException("buffer size must be a positive integer")
598 self.buff_size = buff_size
599
601 """
602 Get the stats for this topic subscriber
603 @return: stats for topic in getBusStats() publisher format::
604 (topicName, connStats)
605 where connStats is::
606 [connectionId, bytesReceived, numSent, dropEstimate, connected]*
607 @rtype: list
608 """
609
610 conn = self.connections
611
612 stats = (self.resolved_name,
613 [(c.id, c.stat_bytes, c.stat_num_msg, -1, not c.done)
614 for c in conn] )
615 return stats
616
618 """
619 Register a callback to be invoked whenever a new message is received
620 @param cb: callback function to invoke with message data
621 instance, i.e. fn(data). If callback args is set, they will
622 be passed in as the second argument.
623 @type cb: fn(msg)
624 @param cb_cargs: additional arguments to pass to callback
625 @type cb_cargs: Any
626 """
627 if self.closed:
628 raise ROSException("subscriber [%s] has been closed"%(self.resolved_name))
629 with self.c_lock:
630
631
632 new_callbacks = self.callbacks[:]
633 new_callbacks.append((cb, cb_args))
634 self.callbacks = new_callbacks
635
636
637 for c in self.connections:
638 if c.latch is not None:
639 self._invoke_callback(c.latch, cb, cb_args)
640
642 """
643 Unregister a message callback.
644 @param cb: callback function
645 @type cb: fn(msg)
646 @param cb_cargs: additional arguments associated with callback
647 @type cb_cargs: Any
648 @raise KeyError: if no matching callback
649 """
650 if self.closed:
651 return
652 with self.c_lock:
653
654
655 matches = [x for x in self.callbacks if x[0] == cb and x[1] == cb_args]
656 if matches:
657 new_callbacks = self.callbacks[:]
658
659 new_callbacks.remove(matches[0])
660 self.callbacks = new_callbacks
661 if not matches:
662 raise KeyError("no matching cb")
663
665 """
666 Invoke callback on msg. Traps and logs any exceptions raise by callback
667 @param msg: message data
668 @type msg: L{Message}
669 @param cb: callback
670 @type cb: fn(msg, cb_args)
671 @param cb_args: callback args or None
672 @type cb_args: Any
673 """
674 try:
675 if cb_args is not None:
676 cb(msg, cb_args)
677 else:
678 cb(msg)
679 except Exception as e:
680 if not is_shutdown():
681 logerr("bad callback: %s\n%s"%(cb, traceback.format_exc()))
682 else:
683 _logger.warn("during shutdown, bad callback: %s\n%s"%(cb, traceback.format_exc()))
684
686 """
687 Called by underlying connection transport for each new message received
688 @param msgs: message data
689 @type msgs: [L{Message}]
690 """
691
692 callbacks = self.callbacks
693 for msg in msgs:
694 for cb, cb_args in callbacks:
695 self._invoke_callback(msg, cb, cb_args)
696
698 """
699 Callback API to receive notifications when new subscribers
700 connect and disconnect.
701 """
702
704 """
705 callback when a peer has subscribed from a topic
706 @param topic_name: topic name. NOTE: topic name will be resolved/remapped
707 @type topic_name: str
708 @param topic_publish: method to publish message data to all subscribers
709 @type topic_publish: fn(data)
710 @param peer_publish: method to publish message data to
711 new subscriber. NOTE: behavior for the latter is
712 transport-dependent as some transports may be broadcast only.
713 @type peer_publish: fn(data)
714 """
715 pass
716
718 """
719 callback when a peer has unsubscribed from a topic
720 @param topic_name: topic name. NOTE: topic name will be resolved/remapped
721 @type topic_name: str
722 @param num_peers: number of remaining peers subscribed to topic
723 @type num_peers: int
724 """
725 pass
726
727
728
729
730
731
732
734 """
735 Class for registering as a publisher of a ROS topic.
736 """
737
738 - def __init__(self, name, data_class, subscriber_listener=None, tcp_nodelay=False, latch=False, headers=None):
739 """
740 Constructor
741 @param name: resource name of topic, e.g. 'laser'.
742 @type name: str
743 @param data_class: message class for serialization
744 @type data_class: L{Message} class
745 @param subscriber_listener: listener for
746 subscription events. May be None.
747 @type subscriber_listener: L{SubscribeListener}
748 @param tcp_nodelay: If True, sets TCP_NODELAY on
749 publisher's socket (disables Nagle algorithm). This results
750 in lower latency publishing at the cost of efficiency.
751 @type tcp_nodelay: bool
752 @param latch: If True, the last message published is
753 'latched', meaning that any future subscribers will be sent
754 that message immediately upon connection.
755 @type latch: bool
756 @raise ROSException: if parameters are invalid
757 """
758 super(Publisher, self).__init__(name, data_class, Registration.PUB)
759
760 if subscriber_listener:
761 self.impl.add_subscriber_listener(subscriber_listener)
762 if tcp_nodelay:
763 get_tcpros_handler().set_tcp_nodelay(self.resolved_name, tcp_nodelay)
764 if latch:
765 self.impl.enable_latch()
766 if headers:
767 self.impl.add_headers(headers)
768
770 """
771 Publish message data object to this topic.
772 Publish can either be called with the message instance to
773 publish or with the constructor args for a new Message
774 instance, i.e.::
775 pub.publish(message_instance)
776 pub.publish(message_field_1, message_field_2...)
777 pub.publish(message_field_1='foo', message_field_2='bar')
778
779 @param args : L{Message} instance, message arguments, or no args if keyword arguments are used
780 @param kwds : Message keyword arguments. If kwds are used, args must be unset
781 @raise ROSException: If rospy node has not been initialized
782 @raise ROSSerializationException: If unable to serialize
783 message. This is usually a type error with one of the fields.
784 """
785 if self.impl is None:
786 raise ROSException("publish() to an unregistered() handle")
787 if not is_initialized():
788 raise ROSException("ROS node has not been initialized yet. Please call init_node() first")
789 data = args_kwds_to_message(self.data_class, args, kwds)
790 try:
791 self.impl.acquire()
792 self.impl.publish(data)
793 except genpy.SerializationError as e:
794
795 _logger.error(traceback.format_exc(e))
796 raise ROSSerializationException(str(e))
797 finally:
798 self.impl.release()
799
801 """
802 Underyling L{_TopicImpl} implementation for publishers.
803 """
804
806 """
807 @param name: name of topic, e.g. 'laser'.
808 @type name: str
809 @param data_class: Message data class
810 @type data_class: L{Message} class
811 """
812 super(_PublisherImpl, self).__init__(name, data_class)
813 if python3 == 0:
814 self.buff = StringIO()
815 else:
816 self.buff = BytesIO()
817 self.publock = threading.RLock()
818 self.subscriber_listeners = []
819
820
821 self.headers = {}
822
823
824 self.is_latch = False
825 self.latch = None
826
827
828 self.message_data_sent = 0
829
831 """close I/O and release resources"""
832 _TopicImpl.close(self)
833
834 if self.subscriber_listeners:
835 del self.subscriber_listeners[:]
836 if self.headers:
837 self.headers.clear()
838 if self.buff is not None:
839 self.buff.close()
840 self.publock = self.headers = self.buff = self.subscriber_listeners = None
841
843 """
844 Add connection headers to this Topic for future connections.
845 @param headers: key/values will be added to current connection
846 header set, overriding any existing keys if they conflict.
847 @type headers: dict
848 """
849 self.headers.update(headers)
850
852 """
853 Enable publish() latch. The latch contains the last published
854 message and is sent to any new subscribers.
855 """
856 self.is_latch = True
857
859 """
860 Get the stats for this topic publisher
861 @return: stats for topic in getBusStats() publisher format::
862 [topicName, messageDataBytes, connStats],
863 where connStats is::
864 [id, bytes, numMessages, connected]*
865 @rtype: list
866 """
867
868 conn = self.connections
869 return (self.resolved_name, self.message_data_sent,
870 [(c.id, c.stat_bytes, c.stat_num_msg, not c.done) for c in conn] )
871
873 """
874 Add a L{SubscribeListener} for subscribe events.
875 @param l: listener instance
876 @type l: L{SubscribeListener}
877 """
878 self.subscriber_listeners.append(l)
879
881 """lock for thread-safe publishing to this transport"""
882 if self.publock is not None:
883 self.publock.acquire()
884
886 """lock for thread-safe publishing to this transport"""
887 if self.publock is not None:
888 self.publock.release()
889
891 """
892 Add a connection to this topic. This must be a PubTransport. If
893 the latch is enabled, c will be sent a the value of the
894 latch.
895 @param c: connection instance
896 @type c: L{Transport}
897 @return: True if connection was added
898 @rtype: bool
899 """
900 super(_PublisherImpl, self).add_connection(c)
901 def publish_single(data):
902 self.publish(data, connection_override=c)
903 for l in self.subscriber_listeners:
904 l.peer_subscribe(self.resolved_name, self.publish, publish_single)
905 if self.is_latch and self.latch is not None:
906 with self.publock:
907 self.publish(self.latch, connection_override=c)
908 return True
909
911 """
912 Remove existing connection from this topic.
913 @param c: connection instance to remove
914 @type c: L{Transport}
915 """
916 super(_PublisherImpl, self).remove_connection(c)
917 num = len(self.connections)
918 for l in self.subscriber_listeners:
919 l.peer_unsubscribe(self.resolved_name, num)
920
921 - def publish(self, message, connection_override=None):
922 """
923 Publish the data to the topic. If the topic has no subscribers,
924 the method will return without any affect. Access to publish()
925 should be locked using acquire() and release() in order to
926 ensure proper message publish ordering.
927
928 @param message: message data instance to publish
929 @type message: L{Message}
930 @param connection_override: publish to this connection instead of all
931 @type connection_override: L{Transport}
932 @return: True if the data was published, False otherwise.
933 @rtype: bool
934 @raise genpy.SerializationError: if L{Message} instance is unable to serialize itself
935 @raise rospy.ROSException: if topic has been closed or was closed during publish()
936 """
937
938
939 if self.closed:
940
941
942 if not is_shutdown():
943 raise ROSException("publish() to a closed topic")
944 else:
945 return
946
947 if self.is_latch:
948 self.latch = message
949
950 if not self.has_connections():
951
952 return False
953
954 if connection_override is None:
955
956 conns = self.connections
957 else:
958 conns = [connection_override]
959
960
961
962 b = self.buff
963 try:
964 b.tell()
965
966
967 self.seq += 1
968 serialize_message(b, self.seq, message)
969
970
971 err_con = []
972 data = b.getvalue()
973
974 for c in conns:
975 try:
976 if not is_shutdown():
977 c.write_data(data)
978 except TransportTerminated as e:
979 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc()))
980 err_con.append(c)
981 except Exception as e:
982
983 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc()))
984 err_con.append(c)
985
986
987 self.message_data_sent += b.tell()
988 b.seek(0)
989 b.truncate(0)
990
991 except ValueError:
992
993
994
995 if self.closed:
996 if is_shutdown():
997
998
999 return
1000 else:
1001
1002
1003 raise ROSException("topic was closed during publish()")
1004 else:
1005
1006 raise
1007
1008
1009 for c in err_con:
1010 try:
1011
1012
1013 c.close()
1014 except:
1015 pass
1016
1017
1018
1019
1021 """
1022 Tracks Topic objects
1023 See L{get_topic_manager()} for singleton access
1024 """
1025
1027 """ctor."""
1028 super(_TopicManager, self).__init__()
1029 self.pubs = {}
1030 self.subs = {}
1031 self.topics = set()
1032 self.lock = threading.Condition()
1033 self.closed = False
1034 _logger.info("topicmanager initialized")
1035
1037 """
1038 get topic publisher and subscriber connection info for getBusInfo() api
1039 @return: [bus info stats]
1040 See getBusInfo() API for more data structure details.
1041 @rtype: list
1042 """
1043 with self.lock:
1044 info = []
1045 for s in chain(iter(self.pubs.values()), iter(self.subs.values())):
1046 info.extend(s.get_stats_info())
1047 return info
1048
1050 """
1051 get topic publisher and subscriber stats for getBusStats() api
1052 @return: [publisherStats, subscriberStats].
1053 See getBusStats() API for more data structure details.
1054 @rtype: list
1055 """
1056 with self.lock:
1057 return [s.get_stats() for s in self.pubs.values()],\
1058 [s.get_stats() for s in self.subs.values()]
1059
1061 """
1062 Close all registered publication and subscriptions. Manager is
1063 no longer usable after close.
1064 """
1065 self.closed = True
1066 with self.lock:
1067 for t in chain(iter(self.pubs.values()), iter(self.subs.values())):
1068 t.close()
1069 self.pubs.clear()
1070 self.subs.clear()
1071
1072 - def _add(self, ps, rmap, reg_type):
1073 """
1074 Add L{_TopicImpl} instance to rmap
1075 @param ps: a pub/sub impl instance
1076 @type ps: L{_TopicImpl}
1077 @param rmap: { topic: _TopicImpl} rmap to record instance in
1078 @type rmap: dict
1079 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1080 @type reg_type: str
1081 """
1082 resolved_name = ps.resolved_name
1083 _logger.debug("tm._add: %s, %s, %s", resolved_name, ps.type, reg_type)
1084 with self.lock:
1085 rmap[resolved_name] = ps
1086 self.topics.add(resolved_name)
1087
1088
1089
1090 get_registration_listeners().notify_added(resolved_name, ps.type, reg_type)
1091
1093 """recalculate self.topics. expensive"""
1094 self.topics = set([x.resolved_name for x in self.pubs.values()] +
1095 [x.resolved_name for x in self.subs.values()])
1096
1097 - def _remove(self, ps, rmap, reg_type):
1098 """
1099 Remove L{_TopicImpl} instance from rmap
1100 @param ps: a pub/sub impl instance
1101 @type ps: L{_TopicImpl}
1102 @param rmap: topic->_TopicImpl rmap to remove instance in
1103 @type rmap: dict
1104 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1105 @type reg_type: str
1106 """
1107 resolved_name = ps.resolved_name
1108 _logger.debug("tm._remove: %s, %s, %s", resolved_name, ps.type, reg_type)
1109 with self.lock:
1110 del rmap[resolved_name]
1111 self. _recalculate_topics()
1112
1113
1114
1115 get_registration_listeners().notify_removed(resolved_name, ps.type, reg_type)
1116
1117 - def get_impl(self, reg_type, resolved_name):
1118 """
1119 Get the L{_TopicImpl} for the specified topic. This is mainly for
1120 testing purposes. Unlike acquire_impl, it does not alter the
1121 ref count.
1122 @param resolved_name: resolved topic name
1123 @type resolved_name: str
1124 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1125 @type reg_type: str
1126 """
1127 if reg_type == Registration.PUB:
1128 rmap = self.pubs
1129 elif reg_type == Registration.SUB:
1130 rmap = self.subs
1131 else:
1132 raise TypeError("invalid reg_type: %s"%s)
1133 return rmap.get(resolved_name, None)
1134
1135 - def acquire_impl(self, reg_type, resolved_name, data_class):
1136 """
1137 Acquire a L{_TopicImpl} for the specified topic (create one if it
1138 doesn't exist). Every L{Topic} instance has a _TopicImpl that
1139 actually controls the topic resources so that multiple Topic
1140 instances use the same underlying connections. 'Acquiring' a
1141 topic implementation marks that another Topic instance is
1142 using the TopicImpl.
1143
1144 @param resolved_name: resolved topic name
1145 @type resolved_name: str
1146
1147 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1148 @type reg_type: str
1149
1150 @param data_class: message class for topic
1151 @type data_class: L{Message} class
1152 """
1153 if reg_type == Registration.PUB:
1154 rmap = self.pubs
1155 impl_class = _PublisherImpl
1156 elif reg_type == Registration.SUB:
1157 rmap = self.subs
1158 impl_class = _SubscriberImpl
1159 else:
1160 raise TypeError("invalid reg_type: %s"%s)
1161 with self.lock:
1162 impl = rmap.get(resolved_name, None)
1163 if not impl:
1164 impl = impl_class(resolved_name, data_class)
1165 self._add(impl, rmap, reg_type)
1166 impl.ref_count += 1
1167 return impl
1168
1170 """
1171 Release a L_{TopicImpl} for the specified topic.
1172
1173 Every L{Topic} instance has a _TopicImpl that actually
1174 controls the topic resources so that multiple Topic instances
1175 use the same underlying connections. 'Acquiring' a topic
1176 implementation marks that another Topic instance is using the
1177 TopicImpl.
1178
1179 @param resolved_name: resolved topic name
1180 @type resolved_name: str
1181 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1182 @type reg_type: str
1183 """
1184 if reg_type == Registration.PUB:
1185 rmap = self.pubs
1186 else:
1187 rmap = self.subs
1188 with self.lock:
1189
1190 if self.closed:
1191 return
1192 impl = rmap.get(resolved_name, None)
1193 assert impl is not None, "cannot release topic impl as impl [%s] does not exist"%resolved_name
1194 impl.ref_count -= 1
1195 assert impl.ref_count >= 0, "topic impl's reference count has gone below zero"
1196 if impl.ref_count == 0:
1197 rospyinfo("topic impl's ref count is zero, deleting topic %s...", resolved_name)
1198 impl.close()
1199 self._remove(impl, rmap, reg_type)
1200 del impl
1201 _logger.debug("... done deleting topic %s", resolved_name)
1202
1204 """
1205 @param resolved_name: resolved topic name
1206 @type resolved_name: str
1207 @return: list of L{_PublisherImpl}s
1208 @rtype: [L{_PublisherImpl}]
1209 """
1210 return self.pubs.get(resolved_name, None)
1211
1213 """
1214 @param resolved_name: topic name
1215 @type resolved_name: str
1216 @return: subscriber for the specified topic.
1217 @rtype: L{_SubscriberImpl}
1218 """
1219 return self.subs.get(resolved_name, None)
1220
1222 """
1223 @param resolved_name: resolved topic name
1224 @type resolved_name: str
1225 @return: True if manager has subscription for specified topic
1226 @rtype: bool
1227 """
1228 return resolved_name in self.subs
1229
1231 """
1232 @param resolved_name: resolved topic name
1233 @type resolved_name: str
1234 @return: True if manager has publication for specified topic
1235 @rtype: bool
1236 """
1237 return resolved_name in self.pubs
1238
1240 """
1241 @return: list of topic names this node subscribes to/publishes
1242 @rtype: [str]
1243 """
1244 return self.topics
1245
1247 return [[k, v.type] for k, v in rmap.items()]
1248
1249
1251 return self._get_list(self.subs)
1252
1253
1255 return self._get_list(self.pubs)
1256
1257 set_topic_manager(_TopicManager())
1258