Package rospy :: Package impl :: Module tcpros_pubsub

Source Code for Module rospy.impl.tcpros_pubsub

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id: tcpros_pubsub.py 13838 2011-06-01 01:20:31Z kwc $ 
 34   
 35  """Internal use: Topic-specific extensions for TCPROS support""" 
 36   
 37  import socket 
 38  import threading 
 39  import time 
 40   
 41  from rospy.core import logwarn, logerr, logdebug, rospyerr 
 42  import rospy.exceptions 
 43  import rospy.names 
 44   
 45  import rospy.impl.registration 
 46  import rospy.impl.transport 
 47   
 48  from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol, \ 
 49      get_tcpros_server_address, start_tcpros_server,\ 
 50      DEFAULT_BUFF_SIZE, TCPROS 
 51   
52 -class TCPROSSub(TCPROSTransportProtocol):
53 """ 54 Subscription transport implementation for receiving topic data via 55 peer-to-peer TCP/IP sockets 56 """ 57
58 - def __init__(self, resolved_name, recv_data_class, queue_size=None, \ 59 buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False):
60 """ 61 ctor. 62 63 @param resolved_name: resolved subscription name 64 @type resolved_name: str 65 66 @param recv_data_class: class to instantiate to receive 67 messages 68 @type recv_data_class: L{rospy.Message} 69 70 @param queue_size: maximum number of messages to 71 deserialize from newly read data off socket 72 @type queue_size: int 73 74 @param buff_size: recv buffer size 75 @type buff_size: int 76 77 @param tcp_nodelay: If True, request TCP_NODELAY from publisher 78 @type tcp_nodelay: bool 79 """ 80 super(TCPROSSub, self).__init__(resolved_name, recv_data_class, queue_size, buff_size) 81 self.direction = rospy.impl.transport.INBOUND 82 self.tcp_nodelay = tcp_nodelay
83
84 - def get_header_fields(self):
85 """ 86 @return: dictionary of subscriber fields 87 @rtype: dict 88 """ 89 return {'topic': self.resolved_name, 90 'message_definition': self.recv_data_class._full_text, 91 'tcp_nodelay': '1' if self.tcp_nodelay else '0', 92 'md5sum': self.recv_data_class._md5sum, 93 'type': self.recv_data_class._type, 94 'callerid': rospy.names.get_caller_id()}
95 96 # Separate method for easier testing
97 -def _configure_pub_socket(sock, is_tcp_nodelay):
98 """ 99 Configure socket options on a new publisher socket. 100 @param sock: socket.socket 101 @type sock: socket.socket 102 @param is_tcp_nodelay: if True, TCP_NODELAY will be set on outgoing socket if available 103 @param is_tcp_nodelay: bool 104 """ 105 # #956: low latency, TCP_NODELAY support 106 if is_tcp_nodelay: 107 if hasattr(socket, 'TCP_NODELAY'): 108 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 109 else: 110 logwarn("WARNING: cannot enable TCP_NODELAY as its not supported on this platform")
111 112 #TODO:POLLING: TCPROSPub currently doesn't actually do anything -- not until polling is implemented 113
114 -class TCPROSPub(TCPROSTransportProtocol):
115 """ 116 Publisher transport implementation for publishing topic data via 117 peer-to-peer TCP/IP sockets. 118 """ 119
120 - def __init__(self, resolved_name, pub_data_class, is_latch=False, headers=None):
121 """ 122 ctor. 123 @param resolved_name: resolved topic name 124 @type resolved_name: str 125 @param pub_data_class: class to instance to receive messages 126 @type pub_data_class: L{rospy.Message} class 127 @param is_latch: If True, Publisher is latching 128 @type is_latch: bool 129 """ 130 # very small buffer size for publishers as the messages they receive are very small 131 super(TCPROSPub, self).__init__(resolved_name, None, queue_size=None, buff_size=128) 132 self.pub_data_class = pub_data_class 133 self.direction = rospy.impl.transport.OUTBOUND 134 self.is_latch = is_latch 135 self.headers = headers if headers else {}
136
137 - def get_header_fields(self):
138 base = {'topic': self.resolved_name, 139 'type': self.pub_data_class._type, 140 'latching': '1' if self.is_latch else '0', 141 'message_definition': self.pub_data_class._full_text, 142 'md5sum': self.pub_data_class._md5sum, 143 'callerid': rospy.names.get_caller_id() } 144 145 # this implementation allows the user to override builtin 146 # fields. this could potentially enable some interesting 147 # features... or it could be really bad. 148 if self.headers: 149 base.update(self.headers) 150 return base
151
152 -def robust_connect_subscriber(conn, dest_addr, dest_port, pub_uri, receive_cb):
153 """ 154 Keeps trying to create connection for subscriber. Then passes off to receive_loop once connected. 155 """ 156 # kwc: this logic is not very elegant. I am waiting to rewrite 157 # the I/O loop with async i/o to clean this up. 158 159 # timeout is really generous. for now just choosing one that is large but not infinite 160 interval = 0.5 161 while conn.socket is None and not conn.done and not rospy.is_shutdown(): 162 try: 163 conn.connect(dest_addr, dest_port, pub_uri, timeout=60.) 164 except rospy.exceptions.TransportInitError as e: 165 rospyerr("unable to create subscriber transport: %s. Will try again in %ss", e, interval) 166 interval = interval * 2 167 time.sleep(interval) 168 169 conn.receive_loop(receive_cb)
170
171 -class TCPROSHandler(rospy.impl.transport.ProtocolHandler):
172 """ 173 ROS Protocol handler for TCPROS. Accepts both TCPROS topic 174 connections as well as ROS service connections over TCP. TCP server 175 socket is run once start_server() is called -- this is implicitly 176 called during init_publisher(). 177 """ 178
179 - def __init__(self):
180 """ctor""" 181 self.tcp_nodelay_map = {} # { topic : tcp_nodelay}
182
183 - def set_tcp_nodelay(self, resolved_name, tcp_nodelay):
184 """ 185 @param resolved_name: resolved topic name 186 @type resolved_name: str 187 188 @param tcp_nodelay: If True, sets TCP_NODELAY on publisher's 189 socket (disables Nagle algorithm). This results in lower 190 latency publishing at the cost of efficiency. 191 @type tcp_nodelay: bool 192 """ 193 self.tcp_nodelay_map[resolved_name] = tcp_nodelay
194
195 - def shutdown(self):
196 """ 197 stops the TCP/IP server responsible for receiving inbound connections 198 """ 199 pass
200
201 - def create_transport(self, resolved_name, pub_uri, protocol_params):
202 """ 203 Connect to topic resolved_name on Publisher pub_uri using TCPROS. 204 @param resolved_name str: resolved topic name 205 @type resolved_name: str 206 @param pub_uri: XML-RPC URI of publisher 207 @type pub_uri: str 208 @param protocol_params: protocol parameters to use for connecting 209 @type protocol_params: [XmlRpcLegal] 210 @return: code, message, debug 211 @rtype: (int, str, int) 212 """ 213 214 #Validate protocol params = [TCPROS, address, port] 215 if type(protocol_params) != list or len(protocol_params) != 3: 216 return 0, "ERROR: invalid TCPROS parameters", 0 217 if protocol_params[0] != TCPROS: 218 return 0, "INTERNAL ERROR: protocol id is not TCPROS: %s"%id, 0 219 id, dest_addr, dest_port = protocol_params 220 221 sub = rospy.impl.registration.get_topic_manager().get_subscriber_impl(resolved_name) 222 223 #Create connection 224 protocol = TCPROSSub(resolved_name, sub.data_class, \ 225 queue_size=sub.queue_size, buff_size=sub.buff_size, 226 tcp_nodelay=sub.tcp_nodelay) 227 conn = TCPROSTransport(protocol, resolved_name) 228 t = threading.Thread(name=resolved_name, target=robust_connect_subscriber, args=(conn, dest_addr, dest_port, pub_uri, sub.receive_callback,)) 229 # don't enable this just yet, need to work on this logic 230 #rospy.core._add_shutdown_thread(t) 231 t.start() 232 233 # Attach connection to _SubscriberImpl 234 if sub.add_connection(conn): #pass tcp connection to handler 235 return 1, "Connected topic[%s]. Transport impl[%s]"%(resolved_name, conn.__class__.__name__), dest_port 236 else: 237 conn.close() 238 return 0, "ERROR: Race condition failure: duplicate topic subscriber [%s] was created"%(resolved_name), 0
239
240 - def supports(self, protocol):
241 """ 242 @param protocol: name of protocol 243 @type protocol: str 244 @return: True if protocol is supported 245 @rtype: bool 246 """ 247 return protocol == TCPROS
248
249 - def get_supported(self):
250 """ 251 Get supported protocols 252 """ 253 return [[TCPROS]]
254
255 - def init_publisher(self, resolved_name, protocol):
256 """ 257 Initialize this node to receive an inbound TCP connection, 258 i.e. startup a TCP server if one is not already running. 259 260 @param resolved_name: topic name 261 @type resolved__name: str 262 263 @param protocol: negotiated protocol 264 parameters. protocol[0] must be the string 'TCPROS' 265 @type protocol: [str, value*] 266 @return: (code, msg, [TCPROS, addr, port]) 267 @rtype: (int, str, list) 268 """ 269 if protocol[0] != TCPROS: 270 return 0, "Internal error: protocol does not match TCPROS: %s"%protocol, [] 271 start_tcpros_server() 272 addr, port = get_tcpros_server_address() 273 return 1, "ready on %s:%s"%(addr, port), [TCPROS, addr, port]
274
275 - def topic_connection_handler(self, sock, client_addr, header):
276 """ 277 Process incoming topic connection. Reads in topic name from 278 handshake and creates the appropriate L{TCPROSPub} handler for the 279 connection. 280 @param sock: socket connection 281 @type sock: socket.socket 282 @param client_addr: client address 283 @type client_addr: (str, int) 284 @param header: key/value pairs from handshake header 285 @type header: dict 286 @return: error string or None 287 @rtype: str 288 """ 289 if rospy.core.is_shutdown_requested(): 290 return "Node is shutting down" 291 for required in ['topic', 'md5sum', 'callerid']: 292 if not required in header: 293 return "Missing required '%s' field"%required 294 else: 295 resolved_topic_name = header['topic'] 296 md5sum = header['md5sum'] 297 tm = rospy.impl.registration.get_topic_manager() 298 topic = tm.get_publisher_impl(resolved_topic_name) 299 if not topic: 300 return "[%s] is not a publisher of [%s]. Topics are %s"%(rospy.names.get_caller_id(), resolved_topic_name, tm.get_publications()) 301 elif not topic.data_class or topic.closed: 302 return "Internal error processing topic [%s]"%(resolved_topic_name) 303 elif md5sum != rospy.names.TOPIC_ANYTYPE and md5sum != topic.data_class._md5sum: 304 data_class = topic.data_class 305 actual_type = data_class._type 306 307 # check to see if subscriber sent 'type' header. If they did, check that 308 # types are same first as this provides a better debugging message 309 if 'type' in header: 310 requested_type = header['type'] 311 if requested_type != actual_type: 312 return "topic types do not match: [%s] vs. [%s]"%(requested_type, actual_type) 313 else: 314 # defaults to actual type 315 requested_type = actual_type 316 317 return "Client [%s] wants topic [%s] to have datatype/md5sum [%s/%s], but our version has [%s/%s] Dropping connection."%(header['callerid'], resolved_topic_name, requested_type, md5sum, actual_type, data_class._md5sum) 318 319 else: 320 #TODO:POLLING if polling header is present, have to spin up receive loop as well 321 322 # #1334: tcp_nodelay support from subscriber option 323 if 'tcp_nodelay' in header: 324 tcp_nodelay = True if header['tcp_nodelay'].strip() == '1' else False 325 else: 326 tcp_nodelay = self.tcp_nodelay_map.get(resolved_topic_name, False) 327 328 _configure_pub_socket(sock, tcp_nodelay) 329 protocol = TCPROSPub(resolved_topic_name, topic.data_class, is_latch=topic.is_latch, headers=topic.headers) 330 transport = TCPROSTransport(protocol, resolved_topic_name) 331 transport.set_socket(sock, header['callerid']) 332 transport.write_header() 333 topic.add_connection(transport)
334