tcpros.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2010, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 #
00033 # Revision $Id: __init__.py 10467 2010-07-21 22:20:07Z kwc $
00034 
00035 """Replacement for default rospy TCPROSHandler"""
00036 
00037 #TODO: this should be a near drop-in replacement for the existing
00038 #handler as long as the topic_manager is passed into the
00039 #constructor. This modified implementation assumes that the TCPROS
00040 #server is already started. In the past, we lazy initialized the
00041 #server, but this optimization cannot be used in practice so we should
00042 #simplify.
00043 
00044 import rospy
00045 from rospy.core import rospyerr
00046 import rospy.exceptions
00047 import rospy.names
00048 import rospy.impl.transport
00049 from rospy.impl.tcpros_pubsub import TCPROSPub, TCPROSSub, TCPROSTransport
00050 from rospy.impl.tcpros_base import TCPROS
00051 
00052 class ProxyTCPROSHandler(rospy.impl.transport.ProtocolHandler):
00053     """
00054     ROS Protocol handler for TCPROS. Accepts both TCPROS topic
00055     connections as well as ROS service connections over TCP. TCP server
00056     socket is run once start_server() is called -- this is implicitly
00057     called during init_publisher().
00058     """
00059 
00060     def __init__(self, topic_manager, tcpros_server):
00061         """ctor"""
00062         self.tcp_nodelay_map = {} # { topic : tcp_nodelay}
00063         self.topic_manager = topic_manager
00064         self.tcpros_server = tcpros_server
00065     
00066     def set_tcp_nodelay(self, resolved_name, tcp_nodelay):
00067         """
00068         @param resolved_name: resolved topic name
00069         @type  resolved_name: str
00070 
00071         @param tcp_nodelay: If True, sets TCP_NODELAY on publisher's
00072         socket (disables Nagle algorithm). This results in lower
00073         latency publishing at the cost of efficiency.
00074         @type  tcp_nodelay: bool
00075         """
00076         self.tcp_nodelay_map[resolved_name] = tcp_nodelay
00077 
00078     def shutdown(self):
00079         """
00080         stops the TCP/IP server responsible for receiving inbound connections        
00081         """
00082         pass
00083 
00084     def create_transport(self, resolved_name, pub_uri, protocol_params):
00085         """
00086         Connect to topic resolved_name on Publisher pub_uri using TCPROS.
00087         @param resolved_name str: resolved topic name
00088         @type  resolved_name: str
00089         @param pub_uri: XML-RPC URI of publisher 
00090         @type  pub_uri: str
00091         @param protocol_params: protocol parameters to use for connecting
00092         @type protocol_params: [XmlRpcLegal]
00093         @return: code, message, debug
00094         @rtype: (int, str, int)
00095         """
00096         
00097         #Validate protocol params = [TCPROS, address, port]
00098         if type(protocol_params) != list or len(protocol_params) != 3:
00099             return 0, "ERROR: invalid TCPROS parameters", 0
00100         if protocol_params[0] != TCPROS:
00101             return 0, "INTERNAL ERROR: protocol id is not TCPROS: %s"%id, 0
00102         id, dest_addr, dest_port = protocol_params
00103 
00104         sub = self.topic_manager.get_subscriber_impl(resolved_name)
00105 
00106         #Create connection 
00107         try:
00108             protocol = TCPROSSub(resolved_name, sub.data_class, \
00109                                  queue_size=sub.queue_size, buff_size=sub.buff_size,
00110                                  tcp_nodelay=sub.tcp_nodelay)
00111             conn = TCPROSTransport(protocol, resolved_name)
00112             # timeout is really generous. for now just choosing one that is large but not infinite
00113             conn.connect(dest_addr, dest_port, pub_uri, timeout=60.)
00114             t = threading.Thread(name=resolved_name, target=conn.receive_loop, args=(sub.receive_callback,))
00115             # don't enable this just yet, need to work on this logic
00116             #rospy.core._add_shutdown_thread(t)
00117             t.start()
00118         except rospy.exceptions.TransportInitError, e:
00119             rospyerr("unable to create subscriber transport: %s", e)
00120             return 0, "Internal error creating inbound TCP connection for [%s]: %s"%(resolved_name, e), -1
00121 
00122         # Attach connection to _SubscriberImpl
00123         if sub.add_connection(conn): #pass tcp connection to handler
00124             return 1, "Connected topic[%s]. Transport impl[%s]"%(resolved_name, conn.__class__.__name__), dest_port
00125         else:
00126             conn.close()
00127             return 0, "ERROR: Race condition failure: duplicate topic subscriber [%s] was created"%(resolved_name), 0
00128 
00129     def supports(self, protocol):
00130         """
00131         @param protocol: name of protocol
00132         @type protocol: str
00133         @return: True if protocol is supported
00134         @rtype: bool
00135         """
00136         return protocol == TCPROS
00137     
00138     def get_supported(self):
00139         """
00140         Get supported protocols
00141         """
00142         return [[TCPROS]]
00143         
00144     def init_publisher(self, resolved_name, protocol):
00145         """
00146         Initialize this node to receive an inbound TCP connection,
00147         i.e. startup a TCP server if one is not already running.
00148         
00149         @param resolved_name: topic name
00150         @type  resolved__name: str
00151         
00152         @param protocol: negotiated protocol
00153           parameters. protocol[0] must be the string 'TCPROS'
00154         @type  protocol: [str, value*]
00155         @return: (code, msg, [TCPROS, addr, port])
00156         @rtype: (int, str, list)
00157         """
00158         if protocol[0] != TCPROS:
00159             return 0, "Internal error: protocol does not match TCPROS: %s"%protocol, []
00160 
00161         addr, port = self.tcpros_server.get_address()
00162         return 1, "ready on %s:%s"%(addr, port), [TCPROS, addr, port]
00163 
00164     def topic_connection_handler(self, sock, client_addr, header):
00165         """
00166         Process incoming topic connection. Reads in topic name from
00167         handshake and creates the appropriate L{TCPROSPub} handler for the
00168         connection.
00169         @param sock: socket connection
00170         @type sock: socket.socket
00171         @param client_addr: client address
00172         @type client_addr: (str, int)
00173         @param header: key/value pairs from handshake header
00174         @type header: dict
00175         @return: error string or None 
00176         @rtype: str
00177         """
00178         for required in ['topic', 'md5sum', 'callerid']:
00179             if not required in header:
00180                 return "Missing required '%s' field"%required
00181         else:
00182             resolved_topic_name = header['topic']
00183             md5sum = header['md5sum']
00184             tm = self.topic_manager
00185             topic = tm.get_publisher_impl(resolved_topic_name)
00186             if not topic:
00187                 return "[%s] is not a publisher of  [%s]. Topics are %s"%(rospy.names.get_caller_id(), resolved_topic_name, tm.get_publications())
00188             elif md5sum != rospy.names.TOPIC_ANYTYPE and md5sum != topic.data_class._md5sum:
00189 
00190                 actual_type = topic.data_class._type
00191 
00192                 # check to see if subscriber sent 'type' header. If they did, check that
00193                 # types are same first as this provides a better debugging message
00194                 if 'type' in header:
00195                     requested_type = header['type']
00196                     if requested_type != actual_type:
00197                         return "topic types do not match: [%s] vs. [%s]"%(requested_type, actual_type)
00198                 else:
00199                     # defaults to actual type
00200                     requested_type = actual_type
00201 
00202                 return "Client [%s] wants topic [%s] to have datatype/md5sum [%s/%s], but our version has [%s/%s] Dropping connection."%(header['callerid'], resolved_topic_name, requested_type, md5sum, actual_type, topic.data_class._md5sum)
00203 
00204             else:
00205                 #TODO:POLLING if polling header is present, have to spin up receive loop as well
00206 
00207                 # #1334: tcp_nodelay support from subscriber option
00208                 if 'tcp_nodelay' in header:
00209                     tcp_nodelay = True if header['tcp_nodelay'].strip() == '1' else False
00210                 else:
00211                     tcp_nodelay = self.tcp_nodelay_map.get(resolved_topic_name, False)
00212 
00213                 rospy.impl.tcpros_pubsub._configure_pub_socket(sock, tcp_nodelay)
00214                 protocol = TCPROSPub(resolved_topic_name, topic.data_class, is_latch=topic.is_latch, headers=topic.headers)
00215                 transport = TCPROSTransport(protocol, resolved_topic_name)
00216                 transport.set_socket(sock, header['callerid'])
00217                 transport.write_header()
00218                 topic.add_connection(transport)
00219             
00220 


rosproxy
Author(s): Ken Conley
autogenerated on Mon Dec 2 2013 13:03:48