$search
00001 # Software License Agreement (BSD License) 00002 # 00003 # Copyright (c) 2010, Willow Garage, Inc. 00004 # All rights reserved. 00005 # 00006 # Redistribution and use in source and binary forms, with or without 00007 # modification, are permitted provided that the following conditions 00008 # are met: 00009 # 00010 # * Redistributions of source code must retain the above copyright 00011 # notice, this list of conditions and the following disclaimer. 00012 # * Redistributions in binary form must reproduce the above 00013 # copyright notice, this list of conditions and the following 00014 # disclaimer in the documentation and/or other materials provided 00015 # with the distribution. 00016 # * Neither the name of Willow Garage, Inc. nor the names of its 00017 # contributors may be used to endorse or promote products derived 00018 # from this software without specific prior written permission. 00019 # 00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00031 # POSSIBILITY OF SUCH DAMAGE. 00032 # 00033 # Revision $Id: __init__.py 10467 2010-07-21 22:20:07Z kwc $ 00034 00035 """Replacement for default rospy TCPROSHandler""" 00036 00037 #TODO: this should be a near drop-in replacement for the existing 00038 #handler as long as the topic_manager is passed into the 00039 #constructor. This modified implementation assumes that the TCPROS 00040 #server is already started. In the past, we lazy initialized the 00041 #server, but this optimization cannot be used in practice so we should 00042 #simplify. 00043 00044 import rospy 00045 from rospy.core import rospyerr 00046 import rospy.exceptions 00047 import rospy.names 00048 import rospy.impl.transport 00049 from rospy.impl.tcpros_pubsub import TCPROSPub, TCPROSSub, TCPROSTransport 00050 from rospy.impl.tcpros_base import TCPROS 00051 00052 class ProxyTCPROSHandler(rospy.impl.transport.ProtocolHandler): 00053 """ 00054 ROS Protocol handler for TCPROS. Accepts both TCPROS topic 00055 connections as well as ROS service connections over TCP. TCP server 00056 socket is run once start_server() is called -- this is implicitly 00057 called during init_publisher(). 00058 """ 00059 00060 def __init__(self, topic_manager, tcpros_server): 00061 """ctor""" 00062 self.tcp_nodelay_map = {} # { topic : tcp_nodelay} 00063 self.topic_manager = topic_manager 00064 self.tcpros_server = tcpros_server 00065 00066 def set_tcp_nodelay(self, resolved_name, tcp_nodelay): 00067 """ 00068 @param resolved_name: resolved topic name 00069 @type resolved_name: str 00070 00071 @param tcp_nodelay: If True, sets TCP_NODELAY on publisher's 00072 socket (disables Nagle algorithm). This results in lower 00073 latency publishing at the cost of efficiency. 00074 @type tcp_nodelay: bool 00075 """ 00076 self.tcp_nodelay_map[resolved_name] = tcp_nodelay 00077 00078 def shutdown(self): 00079 """ 00080 stops the TCP/IP server responsible for receiving inbound connections 00081 """ 00082 pass 00083 00084 def create_transport(self, resolved_name, pub_uri, protocol_params): 00085 """ 00086 Connect to topic resolved_name on Publisher pub_uri using TCPROS. 00087 @param resolved_name str: resolved topic name 00088 @type resolved_name: str 00089 @param pub_uri: XML-RPC URI of publisher 00090 @type pub_uri: str 00091 @param protocol_params: protocol parameters to use for connecting 00092 @type protocol_params: [XmlRpcLegal] 00093 @return: code, message, debug 00094 @rtype: (int, str, int) 00095 """ 00096 00097 #Validate protocol params = [TCPROS, address, port] 00098 if type(protocol_params) != list or len(protocol_params) != 3: 00099 return 0, "ERROR: invalid TCPROS parameters", 0 00100 if protocol_params[0] != TCPROS: 00101 return 0, "INTERNAL ERROR: protocol id is not TCPROS: %s"%id, 0 00102 id, dest_addr, dest_port = protocol_params 00103 00104 sub = self.topic_manager.get_subscriber_impl(resolved_name) 00105 00106 #Create connection 00107 try: 00108 protocol = TCPROSSub(resolved_name, sub.data_class, \ 00109 queue_size=sub.queue_size, buff_size=sub.buff_size, 00110 tcp_nodelay=sub.tcp_nodelay) 00111 conn = TCPROSTransport(protocol, resolved_name) 00112 # timeout is really generous. for now just choosing one that is large but not infinite 00113 conn.connect(dest_addr, dest_port, pub_uri, timeout=60.) 00114 t = threading.Thread(name=resolved_name, target=conn.receive_loop, args=(sub.receive_callback,)) 00115 # don't enable this just yet, need to work on this logic 00116 #rospy.core._add_shutdown_thread(t) 00117 t.start() 00118 except rospy.exceptions.TransportInitError, e: 00119 rospyerr("unable to create subscriber transport: %s", e) 00120 return 0, "Internal error creating inbound TCP connection for [%s]: %s"%(resolved_name, e), -1 00121 00122 # Attach connection to _SubscriberImpl 00123 if sub.add_connection(conn): #pass tcp connection to handler 00124 return 1, "Connected topic[%s]. Transport impl[%s]"%(resolved_name, conn.__class__.__name__), dest_port 00125 else: 00126 conn.close() 00127 return 0, "ERROR: Race condition failure: duplicate topic subscriber [%s] was created"%(resolved_name), 0 00128 00129 def supports(self, protocol): 00130 """ 00131 @param protocol: name of protocol 00132 @type protocol: str 00133 @return: True if protocol is supported 00134 @rtype: bool 00135 """ 00136 return protocol == TCPROS 00137 00138 def get_supported(self): 00139 """ 00140 Get supported protocols 00141 """ 00142 return [[TCPROS]] 00143 00144 def init_publisher(self, resolved_name, protocol): 00145 """ 00146 Initialize this node to receive an inbound TCP connection, 00147 i.e. startup a TCP server if one is not already running. 00148 00149 @param resolved_name: topic name 00150 @type resolved__name: str 00151 00152 @param protocol: negotiated protocol 00153 parameters. protocol[0] must be the string 'TCPROS' 00154 @type protocol: [str, value*] 00155 @return: (code, msg, [TCPROS, addr, port]) 00156 @rtype: (int, str, list) 00157 """ 00158 if protocol[0] != TCPROS: 00159 return 0, "Internal error: protocol does not match TCPROS: %s"%protocol, [] 00160 00161 addr, port = self.tcpros_server.get_address() 00162 return 1, "ready on %s:%s"%(addr, port), [TCPROS, addr, port] 00163 00164 def topic_connection_handler(self, sock, client_addr, header): 00165 """ 00166 Process incoming topic connection. Reads in topic name from 00167 handshake and creates the appropriate L{TCPROSPub} handler for the 00168 connection. 00169 @param sock: socket connection 00170 @type sock: socket.socket 00171 @param client_addr: client address 00172 @type client_addr: (str, int) 00173 @param header: key/value pairs from handshake header 00174 @type header: dict 00175 @return: error string or None 00176 @rtype: str 00177 """ 00178 for required in ['topic', 'md5sum', 'callerid']: 00179 if not required in header: 00180 return "Missing required '%s' field"%required 00181 else: 00182 resolved_topic_name = header['topic'] 00183 md5sum = header['md5sum'] 00184 tm = self.topic_manager 00185 topic = tm.get_publisher_impl(resolved_topic_name) 00186 if not topic: 00187 return "[%s] is not a publisher of [%s]. Topics are %s"%(rospy.names.get_caller_id(), resolved_topic_name, tm.get_publications()) 00188 elif md5sum != rospy.names.TOPIC_ANYTYPE and md5sum != topic.data_class._md5sum: 00189 00190 actual_type = topic.data_class._type 00191 00192 # check to see if subscriber sent 'type' header. If they did, check that 00193 # types are same first as this provides a better debugging message 00194 if 'type' in header: 00195 requested_type = header['type'] 00196 if requested_type != actual_type: 00197 return "topic types do not match: [%s] vs. [%s]"%(requested_type, actual_type) 00198 else: 00199 # defaults to actual type 00200 requested_type = actual_type 00201 00202 return "Client [%s] wants topic [%s] to have datatype/md5sum [%s/%s], but our version has [%s/%s] Dropping connection."%(header['callerid'], resolved_topic_name, requested_type, md5sum, actual_type, topic.data_class._md5sum) 00203 00204 else: 00205 #TODO:POLLING if polling header is present, have to spin up receive loop as well 00206 00207 # #1334: tcp_nodelay support from subscriber option 00208 if 'tcp_nodelay' in header: 00209 tcp_nodelay = True if header['tcp_nodelay'].strip() == '1' else False 00210 else: 00211 tcp_nodelay = self.tcp_nodelay_map.get(resolved_topic_name, False) 00212 00213 rospy.impl.tcpros_pubsub._configure_pub_socket(sock, tcp_nodelay) 00214 protocol = TCPROSPub(resolved_topic_name, topic.data_class, is_latch=topic.is_latch, headers=topic.headers) 00215 transport = TCPROSTransport(protocol, resolved_topic_name) 00216 transport.set_socket(sock, header['callerid']) 00217 transport.write_header() 00218 topic.add_connection(transport) 00219 00220