00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 """Replacement for default rospy TCPROSHandler"""
00036
00037
00038
00039
00040
00041
00042
00043
00044 import rospy
00045 from rospy.core import rospyerr
00046 import rospy.exceptions
00047 import rospy.names
00048 import rospy.impl.transport
00049 from rospy.impl.tcpros_pubsub import TCPROSPub, TCPROSSub, TCPROSTransport
00050 from rospy.impl.tcpros_base import TCPROS
00051
00052 class ProxyTCPROSHandler(rospy.impl.transport.ProtocolHandler):
00053 """
00054 ROS Protocol handler for TCPROS. Accepts both TCPROS topic
00055 connections as well as ROS service connections over TCP. TCP server
00056 socket is run once start_server() is called -- this is implicitly
00057 called during init_publisher().
00058 """
00059
00060 def __init__(self, topic_manager, tcpros_server):
00061 """ctor"""
00062 self.tcp_nodelay_map = {}
00063 self.topic_manager = topic_manager
00064 self.tcpros_server = tcpros_server
00065
00066 def set_tcp_nodelay(self, resolved_name, tcp_nodelay):
00067 """
00068 @param resolved_name: resolved topic name
00069 @type resolved_name: str
00070
00071 @param tcp_nodelay: If True, sets TCP_NODELAY on publisher's
00072 socket (disables Nagle algorithm). This results in lower
00073 latency publishing at the cost of efficiency.
00074 @type tcp_nodelay: bool
00075 """
00076 self.tcp_nodelay_map[resolved_name] = tcp_nodelay
00077
00078 def shutdown(self):
00079 """
00080 stops the TCP/IP server responsible for receiving inbound connections
00081 """
00082 pass
00083
00084 def create_transport(self, resolved_name, pub_uri, protocol_params):
00085 """
00086 Connect to topic resolved_name on Publisher pub_uri using TCPROS.
00087 @param resolved_name str: resolved topic name
00088 @type resolved_name: str
00089 @param pub_uri: XML-RPC URI of publisher
00090 @type pub_uri: str
00091 @param protocol_params: protocol parameters to use for connecting
00092 @type protocol_params: [XmlRpcLegal]
00093 @return: code, message, debug
00094 @rtype: (int, str, int)
00095 """
00096
00097
00098 if type(protocol_params) != list or len(protocol_params) != 3:
00099 return 0, "ERROR: invalid TCPROS parameters", 0
00100 if protocol_params[0] != TCPROS:
00101 return 0, "INTERNAL ERROR: protocol id is not TCPROS: %s"%id, 0
00102 id, dest_addr, dest_port = protocol_params
00103
00104 sub = self.topic_manager.get_subscriber_impl(resolved_name)
00105
00106
00107 try:
00108 protocol = TCPROSSub(resolved_name, sub.data_class, \
00109 queue_size=sub.queue_size, buff_size=sub.buff_size,
00110 tcp_nodelay=sub.tcp_nodelay)
00111 conn = TCPROSTransport(protocol, resolved_name)
00112
00113 conn.connect(dest_addr, dest_port, pub_uri, timeout=60.)
00114 t = threading.Thread(name=resolved_name, target=conn.receive_loop, args=(sub.receive_callback,))
00115
00116
00117 t.start()
00118 except rospy.exceptions.TransportInitError, e:
00119 rospyerr("unable to create subscriber transport: %s", e)
00120 return 0, "Internal error creating inbound TCP connection for [%s]: %s"%(resolved_name, e), -1
00121
00122
00123 if sub.add_connection(conn):
00124 return 1, "Connected topic[%s]. Transport impl[%s]"%(resolved_name, conn.__class__.__name__), dest_port
00125 else:
00126 conn.close()
00127 return 0, "ERROR: Race condition failure: duplicate topic subscriber [%s] was created"%(resolved_name), 0
00128
00129 def supports(self, protocol):
00130 """
00131 @param protocol: name of protocol
00132 @type protocol: str
00133 @return: True if protocol is supported
00134 @rtype: bool
00135 """
00136 return protocol == TCPROS
00137
00138 def get_supported(self):
00139 """
00140 Get supported protocols
00141 """
00142 return [[TCPROS]]
00143
00144 def init_publisher(self, resolved_name, protocol):
00145 """
00146 Initialize this node to receive an inbound TCP connection,
00147 i.e. startup a TCP server if one is not already running.
00148
00149 @param resolved_name: topic name
00150 @type resolved__name: str
00151
00152 @param protocol: negotiated protocol
00153 parameters. protocol[0] must be the string 'TCPROS'
00154 @type protocol: [str, value*]
00155 @return: (code, msg, [TCPROS, addr, port])
00156 @rtype: (int, str, list)
00157 """
00158 if protocol[0] != TCPROS:
00159 return 0, "Internal error: protocol does not match TCPROS: %s"%protocol, []
00160
00161 addr, port = self.tcpros_server.get_address()
00162 return 1, "ready on %s:%s"%(addr, port), [TCPROS, addr, port]
00163
00164 def topic_connection_handler(self, sock, client_addr, header):
00165 """
00166 Process incoming topic connection. Reads in topic name from
00167 handshake and creates the appropriate L{TCPROSPub} handler for the
00168 connection.
00169 @param sock: socket connection
00170 @type sock: socket.socket
00171 @param client_addr: client address
00172 @type client_addr: (str, int)
00173 @param header: key/value pairs from handshake header
00174 @type header: dict
00175 @return: error string or None
00176 @rtype: str
00177 """
00178 for required in ['topic', 'md5sum', 'callerid']:
00179 if not required in header:
00180 return "Missing required '%s' field"%required
00181 else:
00182 resolved_topic_name = header['topic']
00183 md5sum = header['md5sum']
00184 tm = self.topic_manager
00185 topic = tm.get_publisher_impl(resolved_topic_name)
00186 if not topic:
00187 return "[%s] is not a publisher of [%s]. Topics are %s"%(rospy.names.get_caller_id(), resolved_topic_name, tm.get_publications())
00188 elif md5sum != rospy.names.TOPIC_ANYTYPE and md5sum != topic.data_class._md5sum:
00189
00190 actual_type = topic.data_class._type
00191
00192
00193
00194 if 'type' in header:
00195 requested_type = header['type']
00196 if requested_type != actual_type:
00197 return "topic types do not match: [%s] vs. [%s]"%(requested_type, actual_type)
00198 else:
00199
00200 requested_type = actual_type
00201
00202 return "Client [%s] wants topic [%s] to have datatype/md5sum [%s/%s], but our version has [%s/%s] Dropping connection."%(header['callerid'], resolved_topic_name, requested_type, md5sum, actual_type, topic.data_class._md5sum)
00203
00204 else:
00205
00206
00207
00208 if 'tcp_nodelay' in header:
00209 tcp_nodelay = True if header['tcp_nodelay'].strip() == '1' else False
00210 else:
00211 tcp_nodelay = self.tcp_nodelay_map.get(resolved_topic_name, False)
00212
00213 rospy.impl.tcpros_pubsub._configure_pub_socket(sock, tcp_nodelay)
00214 protocol = TCPROSPub(resolved_topic_name, topic.data_class, is_latch=topic.is_latch, headers=topic.headers)
00215 transport = TCPROSTransport(protocol, resolved_topic_name)
00216 transport.set_socket(sock, header['callerid'])
00217 transport.write_header()
00218 topic.add_connection(transport)
00219
00220