1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 rospy implementation of topics.
37
38 Client API
39 ==========
40
41 L{Publisher} and L{Subscriber} are the client API for topics.
42
43 Internal Implementation
44 =======================
45
46 Internally, L{_TopicImpl} instances (_PublisherImpl/_SubscriberImpl)
47 are used to manage actual transport connections. The L{_TopicManager}
48 is responsible for tracking the system-wide state of publications and
49 subscriptions as well as the L{_TopicImpl} instances. More info is below.
50
51 L{_TopicManager}
52 ================
53
54 The L{_TopicManager} does the backend topic bookkeeping for the local
55 node. Use L{get_topic_manager()} to access singleton. Actual topic
56 implementations are done through the
57 L{_TopicImpl}/L{_PublisherImpl}/L{_SubscriberImpl} hierarchy. Client
58 code generates instances of type L{Publisher}/L{Subscriber}, which
59 enable to client to create multiple publishers/subscribers of that
60 topic that get controlled access to the underlying share connections.
61
62 Common parent classes for all rospy topics. The rospy topic autogenerators
63 create classes that are children of these implementations.
64 """
65
66
67 import struct
68 import select
69 try:
70 from cStringIO import StringIO
71 python3 = 0
73 return isinstance(s, basestring)
74 except ImportError:
75 python3 = 1
76 from io import StringIO, BytesIO
78 return isinstance(s, str)
79
80 import threading
81 import logging
82 import time
83
84 from itertools import chain
85 import traceback
86
87 import rosgraph.names
88
89 from rospy.core import *
90 from rospy.exceptions import ROSSerializationException, TransportTerminated
91 from rospy.msg import serialize_message, args_kwds_to_message
92
93 from rospy.impl.registration import get_topic_manager, set_topic_manager, Registration, get_registration_listeners
94 from rospy.impl.tcpros import get_tcpros_handler, DEFAULT_BUFF_SIZE
95 from rospy.impl.tcpros_pubsub import QueuedConnection
96
97 _logger = logging.getLogger('rospy.topics')
98
99
100 import genpy
101 Message = genpy.Message
102
103
104
105
106
107
108
109
111 """Base class of L{Publisher} and L{Subscriber}"""
112
113 - def __init__(self, name, data_class, reg_type):
114 """
115 @param name: graph resource name of topic, e.g. 'laser'.
116 @type name: str
117 @param data_class: message class for serialization
118 @type data_class: L{Message}
119 @param reg_type Registration.PUB or Registration.SUB
120 @type reg_type: str
121 @raise ValueError: if parameters are invalid
122 """
123
124 if not name or not isstring(name):
125 raise ValueError("topic name is not a non-empty string")
126 try:
127 if python3 == 1:
128 name.encode("utf-8")
129 else:
130 name = name.encode("utf-8")
131 except UnicodeError:
132 raise ValueError("topic name must be ascii/utf-8 compatible")
133 if data_class is None:
134 raise ValueError("topic parameter 'data_class' is not initialized")
135 if not type(data_class) == type:
136 raise ValueError("data_class [%s] is not a class"%data_class)
137 if not issubclass(data_class, genpy.Message):
138 raise ValueError("data_class [%s] is not a message data class"%data_class.__class__.__name__)
139
140 if not rosgraph.names.is_legal_name(name):
141 import warnings
142 warnings.warn("'%s' is not a legal ROS graph resource name. This may cause problems with other ROS tools"%name, stacklevel=2)
143
144
145
146 if not rospy.core.is_initialized():
147 self.resolved_name = rospy.names.resolve_name_without_node_name(name)
148 else:
149
150 self.resolved_name = resolve_name(name)
151
152 self.name = self.resolved_name
153
154 self.data_class = data_class
155 self.type = data_class._type
156 self.md5sum = data_class._md5sum
157 self.reg_type = reg_type
158 self.impl = get_topic_manager().acquire_impl(reg_type, self.resolved_name, data_class)
159
161 """
162 get the number of connections to other ROS nodes for this topic. For a Publisher,
163 this corresponds to the number of nodes subscribing. For a Subscriber, the number
164 of publishers.
165 @return: number of connections
166 @rtype: int
167 """
168 return self.impl.get_num_connections()
169
171 """
172 unpublish/unsubscribe from topic. Topic instance is no longer
173 valid after this call. Additional calls to unregister() have no effect.
174 """
175
176
177 resolved_name = self.resolved_name
178 if resolved_name and self.impl:
179 get_topic_manager().release_impl(self.reg_type, resolved_name)
180 self.impl = self.resolved_name = self.type = self.md5sum = self.data_class = None
181
182
184 """
185 select.poll/kqueue abstraction to handle socket failure detection
186 on multiple platforms. NOT thread-safe.
187 """
189 try:
190 self.poller = select.poll()
191 self.add_fd = self.add_poll
192 self.remove_fd = self.remove_poll
193 self.error_iter = self.error_poll_iter
194 except:
195 try:
196
197 self.poller = select.kqueue()
198 self.add_fd = self.add_kqueue
199 self.remove_fd = self.remove_kqueue
200 self.error_iter = self.error_kqueue_iter
201 self.kevents = []
202 except:
203
204 self.poller = self.noop
205 self.add_fd = self.noop
206 self.remove_fd = self.noop
207 self.error_iter = self.noop_iter
208
209 - def noop(self, *args):
211
213 empty_generator = (x for x in list())
214 for x in empty_generator:
215 yield x
216
219
222
224 events = self.poller.poll(0)
225 for fd, event in events:
226 if event & (select.POLLHUP | select.POLLERR):
227 yield fd
228
230 self.kevents.append(select.kevent(fd))
231
233 events = self.poller.control(self.kevents, 0)
234 for event in events:
235 if event & (select.KQ_EV_ERROR | select.KQ_EV_EOF):
236 yield event.ident
237
239 e = [x for x in self.kevents if x.ident == fd]
240 for x in e:
241 self.kevents.remove(x)
242
244 """
245 Base class of internal topic implementations. Each topic has a
246 singleton _TopicImpl implementation for managing the underlying
247 connections.
248 """
249
251 """
252 Base constructor
253 @param name: graph resource name of topic, e.g. 'laser'.
254 @type name: str
255 @param data_class: message data class
256 @type data_class: L{Message}
257 """
258
259
260 self.resolved_name = resolve_name(name)
261 self.name = self.resolved_name
262
263 self.data_class = data_class
264 self.type = data_class._type
265 self.handler = None
266 self.seq = 0
267
268
269
270
271
272
273
274 self.c_lock = threading.RLock()
275 self.connections = []
276 self.closed = False
277
278 self.ref_count = 0
279
280 self.connection_poll = Poller()
281
283
284 if self.closed:
285 return
286 if self.connections is not None:
287 for c in self.connections:
288 try:
289 c.close()
290 except:
291 pass
292 del self.connections[:]
293 self.c_lock = self.connections = self.handler = self.data_class = self.type = None
294
296 """close I/O"""
297 if self.closed:
298 return
299 self.closed = True
300 if self.c_lock is not None:
301 with self.c_lock:
302 for c in self.connections:
303 try:
304 if c is not None:
305 c.close()
306 except:
307
308 _logger.error(traceback.format_exc())
309 del self.connections[:]
310 self.handler = None
311
313 with self.c_lock:
314 return len(self.connections)
315
317 """
318 Query whether or not a connection with the associated \a
319 endpoint has been added to this object.
320 @param endpoint_id: endpoint ID associated with connection.
321 @type endpoint_id: str
322 """
323
324 conn = self.connections
325 for c in conn:
326 if c.endpoint_id == endpoint_id:
327 return True
328 return False
329
331 """
332 Check to see if this topic is connected to other publishers/subscribers
333 @return: True if topic is connected
334 @rtype: bool
335 """
336 if self.connections:
337 return True
338 return False
339
341
342 try:
343 self.connection_poll.remove_fd(c.fileno())
344 except:
345 pass
346 try:
347
348
349 c.close()
350 except:
351 pass
352 if c in connections:
353 connections.remove(c)
354
356 """
357 Add a connection to this topic. If any previous connections
358 to same endpoint exist, drop them.
359
360 @param c: connection instance
361 @type c: Transport
362 @return: True if connection was added, ``bool``
363 """
364 rospyinfo("topic[%s] adding connection to [%s], count %s"%(self.resolved_name, c.endpoint_id, len(self.connections)))
365 with self.c_lock:
366
367
368
369 new_connections = self.connections[:]
370
371
372
373 for oldc in self.connections:
374 if oldc.endpoint_id == c.endpoint_id:
375 self._remove_connection(new_connections, oldc)
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391 for fd in self.connection_poll.error_iter():
392 to_remove = [x for x in new_connections if x.fileno() == fd]
393 for x in to_remove:
394 rospydebug("removing connection to %s, connection error detected"%(x.endpoint_id))
395 self._remove_connection(new_connections, x)
396
397
398
399 new_fd = c.fileno()
400 if new_fd is not None:
401 self.connection_poll.add_fd(new_fd)
402
403
404 new_connections.append(c)
405
406 self.connections = new_connections
407
408
409 c.set_cleanup_callback(self.remove_connection)
410
411 return True
412
414 """
415 Remove connection from topic.
416 @param c: connection instance to remove
417 @type c: Transport
418 """
419 rospyinfo("topic[%s] removing connection to %s"%(self.resolved_name, c.endpoint_id))
420 with self.c_lock:
421
422
423
424 new_connections = self.connections[:]
425 self._remove_connection(new_connections, c)
426 self.connections = new_connections
427
429 """
430 Get the stats for this topic
431 @return: stats for topic in getBusInfo() format::
432 ((connection_id, destination_caller_id, direction, transport, topic_name, connected)*)
433 @rtype: list
434 """
435
436 connections = self.connections
437 return [(c.id, c.endpoint_id, c.direction, c.transport_type, self.resolved_name, True) for c in connections]
438
440 """Get the stats for this topic (API stub)"""
441 raise Exception("subclasses must override")
442
443
444
445
446
448 """
449 Class for registering as a subscriber to a specified topic, where
450 the messages are of a given type.
451 """
452 - def __init__(self, name, data_class, callback=None, callback_args=None,
453 queue_size=None, buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False):
454 """
455 Constructor.
456
457 NOTE: for the queue_size and buff_size
458 parameters, rospy does not attempt to do intelligent merging
459 between multiple Subscriber instances for the same topic. As
460 they share the same underlying transport, multiple Subscribers
461 to the same topic can conflict with one another if they set
462 these parameters differently.
463
464 @param name: graph resource name of topic, e.g. 'laser'.
465 @type name: str
466 @param data_class: data type class to use for messages,
467 e.g. std_msgs.msg.String
468 @type data_class: L{Message} class
469 @param callback: function to call ( fn(data)) when data is
470 received. If callback_args is set, the function must accept
471 the callback_args as a second argument, i.e. fn(data,
472 callback_args). NOTE: Additional callbacks can be added using
473 add_callback().
474 @type callback: str
475 @param callback_args: additional arguments to pass to the
476 callback. This is useful when you wish to reuse the same
477 callback for multiple subscriptions.
478 @type callback_args: any
479 @param queue_size: maximum number of messages to receive at
480 a time. This will generally be 1 or None (infinite,
481 default). buff_size should be increased if this parameter
482 is set as incoming data still needs to sit in the incoming
483 buffer before being discarded. Setting queue_size
484 buff_size to a non-default value affects all subscribers to
485 this topic in this process.
486 @type queue_size: int
487 @param buff_size: incoming message buffer size in bytes. If
488 queue_size is set, this should be set to a number greater
489 than the queue_size times the average message size. Setting
490 buff_size to a non-default value affects all subscribers to
491 this topic in this process.
492 @type buff_size: int
493 @param tcp_nodelay: if True, request TCP_NODELAY from
494 publisher. Use of this option is not generally recommended
495 in most cases as it is better to rely on timestamps in
496 message data. Setting tcp_nodelay to True enables TCP_NODELAY
497 for all subscribers in the same python process.
498 @type tcp_nodelay: bool
499 @raise ROSException: if parameters are invalid
500 """
501 super(Subscriber, self).__init__(name, data_class, Registration.SUB)
502
503
504
505
506 if queue_size is not None:
507 self.impl.set_queue_size(queue_size)
508 if buff_size != DEFAULT_BUFF_SIZE:
509 self.impl.set_buff_size(buff_size)
510
511 if callback is not None:
512
513
514
515 self.impl.add_callback(callback, callback_args)
516
517 self.callback = callback
518 self.callback_args = callback_args
519 else:
520
521 self.callback = self.callback_args = None
522 if tcp_nodelay:
523 self.impl.set_tcp_nodelay(tcp_nodelay)
524
526 """
527 unpublish/unsubscribe from topic. Topic instance is no longer
528 valid after this call. Additional calls to unregister() have no effect.
529 """
530 if self.impl:
531
532
533 if self.callback is not None:
534 self.impl.remove_callback(self.callback, self.callback_args)
535 self.callback = self.callback_args = None
536 super(Subscriber, self).unregister()
537
539 """
540 Underyling L{_TopicImpl} implementation for subscriptions.
541 """
543 """
544 ctor.
545 @param name: graph resource name of topic, e.g. 'laser'.
546 @type name: str
547 @param data_class: Message data class
548 @type data_class: L{Message} class
549 """
550 super(_SubscriberImpl, self).__init__(name, data_class)
551
552
553
554 self.callbacks = []
555 self.queue_size = None
556 self.buff_size = DEFAULT_BUFF_SIZE
557 self.tcp_nodelay = False
558
560 """close I/O and release resources"""
561 _TopicImpl.close(self)
562 if self.callbacks:
563 del self.callbacks[:]
564 self.callbacks = None
565
567 """
568 Set the value of TCP_NODELAY, which causes the Nagle algorithm
569 to be disabled for future topic connections, if the publisher
570 supports it.
571 """
572 self.tcp_nodelay = tcp_nodelay
573
575 """
576 Set the receive queue size. If more than queue_size messages
577 are waiting to be deserialized, they are discarded.
578
579 @param queue_size int: incoming queue size. Must be positive integer or None.
580 @type queue_size: int
581 """
582 if queue_size == -1:
583 self.queue_size = None
584 elif queue_size == 0:
585 raise ROSException("queue size may not be set to zero")
586 elif queue_size is not None and type(queue_size) != int:
587 raise ROSException("queue size must be an integer")
588 else:
589 self.queue_size = queue_size
590
592 """
593 Set the receive buffer size. The exact meaning of this is
594 transport dependent.
595 @param buff_size: receive buffer size
596 @type buff_size: int
597 """
598 if type(buff_size) != int:
599 raise ROSException("buffer size must be an integer")
600 elif buff_size <= 0:
601 raise ROSException("buffer size must be a positive integer")
602 self.buff_size = buff_size
603
605 """
606 Get the stats for this topic subscriber
607 @return: stats for topic in getBusStats() publisher format::
608 (topicName, connStats)
609 where connStats is::
610 [connectionId, bytesReceived, numSent, dropEstimate, connected]*
611 @rtype: list
612 """
613
614 conn = self.connections
615
616 stats = (self.resolved_name,
617 [(c.id, c.stat_bytes, c.stat_num_msg, -1, not c.done)
618 for c in conn] )
619 return stats
620
622 """
623 Register a callback to be invoked whenever a new message is received
624 @param cb: callback function to invoke with message data
625 instance, i.e. fn(data). If callback args is set, they will
626 be passed in as the second argument.
627 @type cb: fn(msg)
628 @param cb_cargs: additional arguments to pass to callback
629 @type cb_cargs: Any
630 """
631 if self.closed:
632 raise ROSException("subscriber [%s] has been closed"%(self.resolved_name))
633 with self.c_lock:
634
635
636 new_callbacks = self.callbacks[:]
637 new_callbacks.append((cb, cb_args))
638 self.callbacks = new_callbacks
639
640
641 for c in self.connections:
642 if c.latch is not None:
643 self._invoke_callback(c.latch, cb, cb_args)
644
646 """
647 Unregister a message callback.
648 @param cb: callback function
649 @type cb: fn(msg)
650 @param cb_cargs: additional arguments associated with callback
651 @type cb_cargs: Any
652 @raise KeyError: if no matching callback
653 """
654 if self.closed:
655 return
656 with self.c_lock:
657
658
659 matches = [x for x in self.callbacks if x[0] == cb and x[1] == cb_args]
660 if matches:
661 new_callbacks = self.callbacks[:]
662
663 new_callbacks.remove(matches[0])
664 self.callbacks = new_callbacks
665 if not matches:
666 raise KeyError("no matching cb")
667
669 """
670 Invoke callback on msg. Traps and logs any exceptions raise by callback
671 @param msg: message data
672 @type msg: L{Message}
673 @param cb: callback
674 @type cb: fn(msg, cb_args)
675 @param cb_args: callback args or None
676 @type cb_args: Any
677 """
678 try:
679 if cb_args is not None:
680 cb(msg, cb_args)
681 else:
682 cb(msg)
683 except Exception as e:
684 if not is_shutdown():
685 logerr("bad callback: %s\n%s"%(cb, traceback.format_exc()))
686 else:
687 _logger.warn("during shutdown, bad callback: %s\n%s"%(cb, traceback.format_exc()))
688
690 """
691 Called by underlying connection transport for each new message received
692 @param msgs: message data
693 @type msgs: [L{Message}]
694 """
695
696 callbacks = self.callbacks
697 for msg in msgs:
698 for cb, cb_args in callbacks:
699 self._invoke_callback(msg, cb, cb_args)
700
702 """
703 Callback API to receive notifications when new subscribers
704 connect and disconnect.
705 """
706
708 """
709 callback when a peer has subscribed from a topic
710 @param topic_name: topic name. NOTE: topic name will be resolved/remapped
711 @type topic_name: str
712 @param topic_publish: method to publish message data to all subscribers
713 @type topic_publish: fn(data)
714 @param peer_publish: method to publish message data to
715 new subscriber. NOTE: behavior for the latter is
716 transport-dependent as some transports may be broadcast only.
717 @type peer_publish: fn(data)
718 """
719 pass
720
722 """
723 callback when a peer has unsubscribed from a topic
724 @param topic_name: topic name. NOTE: topic name will be resolved/remapped
725 @type topic_name: str
726 @param num_peers: number of remaining peers subscribed to topic
727 @type num_peers: int
728 """
729 pass
730
731
732
733
734
735
736
738 """
739 Class for registering as a publisher of a ROS topic.
740 """
741
742 - def __init__(self, name, data_class, subscriber_listener=None, tcp_nodelay=False, latch=False, headers=None, queue_size=None):
743 """
744 Constructor
745 @param name: resource name of topic, e.g. 'laser'.
746 @type name: str
747 @param data_class: message class for serialization
748 @type data_class: L{Message} class
749 @param subscriber_listener: listener for
750 subscription events. May be None.
751 @type subscriber_listener: L{SubscribeListener}
752 @param tcp_nodelay: If True, sets TCP_NODELAY on
753 publisher's socket (disables Nagle algorithm). This results
754 in lower latency publishing at the cost of efficiency.
755 @type tcp_nodelay: bool
756 @param latch: If True, the last message published is
757 'latched', meaning that any future subscribers will be sent
758 that message immediately upon connection.
759 @type latch: bool
760 @raise ROSException: if parameters are invalid
761 """
762 super(Publisher, self).__init__(name, data_class, Registration.PUB)
763
764 if subscriber_listener:
765 self.impl.add_subscriber_listener(subscriber_listener)
766 if tcp_nodelay:
767 get_tcpros_handler().set_tcp_nodelay(self.resolved_name, tcp_nodelay)
768 if latch:
769 self.impl.enable_latch()
770 if headers:
771 self.impl.add_headers(headers)
772 if queue_size is not None:
773 self.impl.set_queue_size(queue_size)
774
776 """
777 Publish message data object to this topic.
778 Publish can either be called with the message instance to
779 publish or with the constructor args for a new Message
780 instance, i.e.::
781 pub.publish(message_instance)
782 pub.publish(message_field_1, message_field_2...)
783 pub.publish(message_field_1='foo', message_field_2='bar')
784
785 @param args : L{Message} instance, message arguments, or no args if keyword arguments are used
786 @param kwds : Message keyword arguments. If kwds are used, args must be unset
787 @raise ROSException: If rospy node has not been initialized
788 @raise ROSSerializationException: If unable to serialize
789 message. This is usually a type error with one of the fields.
790 """
791 if self.impl is None:
792 raise ROSException("publish() to an unregistered() handle")
793 if not is_initialized():
794 raise ROSException("ROS node has not been initialized yet. Please call init_node() first")
795 data = args_kwds_to_message(self.data_class, args, kwds)
796 try:
797 self.impl.acquire()
798 self.impl.publish(data)
799 except genpy.SerializationError as e:
800
801 _logger.error(traceback.format_exc(e))
802 raise ROSSerializationException(str(e))
803 finally:
804 self.impl.release()
805
807 """
808 Underyling L{_TopicImpl} implementation for publishers.
809 """
810
812 """
813 @param name: name of topic, e.g. 'laser'.
814 @type name: str
815 @param data_class: Message data class
816 @type data_class: L{Message} class
817 """
818 super(_PublisherImpl, self).__init__(name, data_class)
819 if python3 == 0:
820 self.buff = StringIO()
821 else:
822 self.buff = BytesIO()
823 self.publock = threading.RLock()
824 self.subscriber_listeners = []
825
826
827 self.headers = {}
828
829
830 self.is_latch = False
831 self.latch = None
832
833
834 self.queue_size = None
835
836
837 self.message_data_sent = 0
838
840 """close I/O and release resources"""
841 _TopicImpl.close(self)
842
843 if self.subscriber_listeners:
844 del self.subscriber_listeners[:]
845 if self.headers:
846 self.headers.clear()
847 if self.buff is not None:
848 self.buff.close()
849 self.publock = self.headers = self.buff = self.subscriber_listeners = None
850
852 """
853 Add connection headers to this Topic for future connections.
854 @param headers: key/values will be added to current connection
855 header set, overriding any existing keys if they conflict.
856 @type headers: dict
857 """
858 self.headers.update(headers)
859
861 """
862 Enable publish() latch. The latch contains the last published
863 message and is sent to any new subscribers.
864 """
865 self.is_latch = True
866
868 self.queue_size = queue_size
869
871 """
872 Get the stats for this topic publisher
873 @return: stats for topic in getBusStats() publisher format::
874 [topicName, messageDataBytes, connStats],
875 where connStats is::
876 [id, bytes, numMessages, connected]*
877 @rtype: list
878 """
879
880 conn = self.connections
881 return (self.resolved_name, self.message_data_sent,
882 [(c.id, c.stat_bytes, c.stat_num_msg, not c.done) for c in conn] )
883
885 """
886 Add a L{SubscribeListener} for subscribe events.
887 @param l: listener instance
888 @type l: L{SubscribeListener}
889 """
890 self.subscriber_listeners.append(l)
891
893 """lock for thread-safe publishing to this transport"""
894 if self.publock is not None:
895 self.publock.acquire()
896
898 """lock for thread-safe publishing to this transport"""
899 if self.publock is not None:
900 self.publock.release()
901
903 """
904 Add a connection to this topic. This must be a PubTransport. If
905 the latch is enabled, c will be sent a the value of the
906 latch.
907 @param c: connection instance
908 @type c: L{Transport}
909 @return: True if connection was added
910 @rtype: bool
911 """
912 if self.queue_size is not None:
913 c = QueuedConnection(c, self.queue_size)
914 super(_PublisherImpl, self).add_connection(c)
915 def publish_single(data):
916 self.publish(data, connection_override=c)
917 for l in self.subscriber_listeners:
918 l.peer_subscribe(self.resolved_name, self.publish, publish_single)
919 if self.is_latch and self.latch is not None:
920 with self.publock:
921 self.publish(self.latch, connection_override=c)
922 return True
923
925 """
926 Remove existing connection from this topic.
927 @param c: connection instance to remove
928 @type c: L{Transport}
929 """
930 super(_PublisherImpl, self).remove_connection(c)
931 num = len(self.connections)
932 for l in self.subscriber_listeners:
933 l.peer_unsubscribe(self.resolved_name, num)
934
935 - def publish(self, message, connection_override=None):
936 """
937 Publish the data to the topic. If the topic has no subscribers,
938 the method will return without any affect. Access to publish()
939 should be locked using acquire() and release() in order to
940 ensure proper message publish ordering.
941
942 @param message: message data instance to publish
943 @type message: L{Message}
944 @param connection_override: publish to this connection instead of all
945 @type connection_override: L{Transport}
946 @return: True if the data was published, False otherwise.
947 @rtype: bool
948 @raise genpy.SerializationError: if L{Message} instance is unable to serialize itself
949 @raise rospy.ROSException: if topic has been closed or was closed during publish()
950 """
951
952
953 if self.closed:
954
955
956 if not is_shutdown():
957 raise ROSException("publish() to a closed topic")
958 else:
959 return
960
961 if self.is_latch:
962 self.latch = message
963
964 if not self.has_connections():
965
966 return False
967
968 if connection_override is None:
969
970 conns = self.connections
971 else:
972 conns = [connection_override]
973
974
975
976 b = self.buff
977 try:
978 b.tell()
979
980
981 self.seq += 1
982 serialize_message(b, self.seq, message)
983
984
985 err_con = []
986 data = b.getvalue()
987
988 for c in conns:
989 try:
990 if not is_shutdown():
991 c.write_data(data)
992 except TransportTerminated as e:
993 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc()))
994 err_con.append(c)
995 except Exception as e:
996
997 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc()))
998 err_con.append(c)
999
1000
1001 self.message_data_sent += b.tell()
1002 b.seek(0)
1003 b.truncate(0)
1004
1005 except ValueError:
1006
1007
1008
1009 if self.closed:
1010 if is_shutdown():
1011
1012
1013 return
1014 else:
1015
1016
1017 raise ROSException("topic was closed during publish()")
1018 else:
1019
1020 raise
1021
1022
1023 for c in err_con:
1024 try:
1025
1026
1027 c.close()
1028 except:
1029 pass
1030
1031
1032
1033
1035 """
1036 Tracks Topic objects
1037 See L{get_topic_manager()} for singleton access
1038 """
1039
1041 """ctor."""
1042 super(_TopicManager, self).__init__()
1043 self.pubs = {}
1044 self.subs = {}
1045 self.topics = set()
1046 self.lock = threading.Condition()
1047 self.closed = False
1048 _logger.info("topicmanager initialized")
1049
1051 """
1052 get topic publisher and subscriber connection info for getBusInfo() api
1053 @return: [bus info stats]
1054 See getBusInfo() API for more data structure details.
1055 @rtype: list
1056 """
1057 with self.lock:
1058 info = []
1059 for s in chain(iter(self.pubs.values()), iter(self.subs.values())):
1060 info.extend(s.get_stats_info())
1061 return info
1062
1064 """
1065 get topic publisher and subscriber stats for getBusStats() api
1066 @return: [publisherStats, subscriberStats].
1067 See getBusStats() API for more data structure details.
1068 @rtype: list
1069 """
1070 with self.lock:
1071 return [s.get_stats() for s in self.pubs.values()],\
1072 [s.get_stats() for s in self.subs.values()]
1073
1075 """
1076 Close all registered publication and subscriptions. Manager is
1077 no longer usable after close.
1078 """
1079 self.closed = True
1080 with self.lock:
1081 for t in chain(iter(self.pubs.values()), iter(self.subs.values())):
1082 t.close()
1083 self.pubs.clear()
1084 self.subs.clear()
1085
1086 - def _add(self, ps, rmap, reg_type):
1087 """
1088 Add L{_TopicImpl} instance to rmap
1089 @param ps: a pub/sub impl instance
1090 @type ps: L{_TopicImpl}
1091 @param rmap: { topic: _TopicImpl} rmap to record instance in
1092 @type rmap: dict
1093 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1094 @type reg_type: str
1095 """
1096 resolved_name = ps.resolved_name
1097 _logger.debug("tm._add: %s, %s, %s", resolved_name, ps.type, reg_type)
1098 with self.lock:
1099 rmap[resolved_name] = ps
1100 self.topics.add(resolved_name)
1101
1102
1103
1104 get_registration_listeners().notify_added(resolved_name, ps.type, reg_type)
1105
1107 """recalculate self.topics. expensive"""
1108 self.topics = set([x.resolved_name for x in self.pubs.values()] +
1109 [x.resolved_name for x in self.subs.values()])
1110
1111 - def _remove(self, ps, rmap, reg_type):
1112 """
1113 Remove L{_TopicImpl} instance from rmap
1114 @param ps: a pub/sub impl instance
1115 @type ps: L{_TopicImpl}
1116 @param rmap: topic->_TopicImpl rmap to remove instance in
1117 @type rmap: dict
1118 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1119 @type reg_type: str
1120 """
1121 resolved_name = ps.resolved_name
1122 _logger.debug("tm._remove: %s, %s, %s", resolved_name, ps.type, reg_type)
1123 with self.lock:
1124 del rmap[resolved_name]
1125 self. _recalculate_topics()
1126
1127
1128
1129 get_registration_listeners().notify_removed(resolved_name, ps.type, reg_type)
1130
1131 - def get_impl(self, reg_type, resolved_name):
1132 """
1133 Get the L{_TopicImpl} for the specified topic. This is mainly for
1134 testing purposes. Unlike acquire_impl, it does not alter the
1135 ref count.
1136 @param resolved_name: resolved topic name
1137 @type resolved_name: str
1138 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1139 @type reg_type: str
1140 """
1141 if reg_type == Registration.PUB:
1142 rmap = self.pubs
1143 elif reg_type == Registration.SUB:
1144 rmap = self.subs
1145 else:
1146 raise TypeError("invalid reg_type: %s"%s)
1147 return rmap.get(resolved_name, None)
1148
1149 - def acquire_impl(self, reg_type, resolved_name, data_class):
1150 """
1151 Acquire a L{_TopicImpl} for the specified topic (create one if it
1152 doesn't exist). Every L{Topic} instance has a _TopicImpl that
1153 actually controls the topic resources so that multiple Topic
1154 instances use the same underlying connections. 'Acquiring' a
1155 topic implementation marks that another Topic instance is
1156 using the TopicImpl.
1157
1158 @param resolved_name: resolved topic name
1159 @type resolved_name: str
1160
1161 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1162 @type reg_type: str
1163
1164 @param data_class: message class for topic
1165 @type data_class: L{Message} class
1166 """
1167 if reg_type == Registration.PUB:
1168 rmap = self.pubs
1169 impl_class = _PublisherImpl
1170 elif reg_type == Registration.SUB:
1171 rmap = self.subs
1172 impl_class = _SubscriberImpl
1173 else:
1174 raise TypeError("invalid reg_type: %s"%s)
1175 with self.lock:
1176 impl = rmap.get(resolved_name, None)
1177 if not impl:
1178 impl = impl_class(resolved_name, data_class)
1179 self._add(impl, rmap, reg_type)
1180 impl.ref_count += 1
1181 return impl
1182
1184 """
1185 Release a L_{TopicImpl} for the specified topic.
1186
1187 Every L{Topic} instance has a _TopicImpl that actually
1188 controls the topic resources so that multiple Topic instances
1189 use the same underlying connections. 'Acquiring' a topic
1190 implementation marks that another Topic instance is using the
1191 TopicImpl.
1192
1193 @param resolved_name: resolved topic name
1194 @type resolved_name: str
1195 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
1196 @type reg_type: str
1197 """
1198 if reg_type == Registration.PUB:
1199 rmap = self.pubs
1200 else:
1201 rmap = self.subs
1202 with self.lock:
1203
1204 if self.closed:
1205 return
1206 impl = rmap.get(resolved_name, None)
1207 assert impl is not None, "cannot release topic impl as impl [%s] does not exist"%resolved_name
1208 impl.ref_count -= 1
1209 assert impl.ref_count >= 0, "topic impl's reference count has gone below zero"
1210 if impl.ref_count == 0:
1211 rospyinfo("topic impl's ref count is zero, deleting topic %s...", resolved_name)
1212 impl.close()
1213 self._remove(impl, rmap, reg_type)
1214 del impl
1215 _logger.debug("... done deleting topic %s", resolved_name)
1216
1218 """
1219 @param resolved_name: resolved topic name
1220 @type resolved_name: str
1221 @return: list of L{_PublisherImpl}s
1222 @rtype: [L{_PublisherImpl}]
1223 """
1224 return self.pubs.get(resolved_name, None)
1225
1227 """
1228 @param resolved_name: topic name
1229 @type resolved_name: str
1230 @return: subscriber for the specified topic.
1231 @rtype: L{_SubscriberImpl}
1232 """
1233 return self.subs.get(resolved_name, None)
1234
1236 """
1237 @param resolved_name: resolved topic name
1238 @type resolved_name: str
1239 @return: True if manager has subscription for specified topic
1240 @rtype: bool
1241 """
1242 return resolved_name in self.subs
1243
1245 """
1246 @param resolved_name: resolved topic name
1247 @type resolved_name: str
1248 @return: True if manager has publication for specified topic
1249 @rtype: bool
1250 """
1251 return resolved_name in self.pubs
1252
1254 """
1255 @return: list of topic names this node subscribes to/publishes
1256 @rtype: [str]
1257 """
1258 return self.topics
1259
1261 return [[k, v.type] for k, v in rmap.items()]
1262
1263
1265 return self._get_list(self.subs)
1266
1267
1269 return self._get_list(self.pubs)
1270
1271 set_topic_manager(_TopicManager())
1272