Package rospy :: Package impl :: Module udpros

Source Code for Module rospy.impl.udpros

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id: udpros.py 9434 2010-04-28 00:20:16Z kwc $ 
 34  """ 
 35  UDPROS connection protocol. 
 36  """ 
 37  ## UDPROS connection protocol. 
 38  #  http://ros.org/wiki/ROS/UDPROS 
 39  #  
 40   
 41  import roslib.network 
 42   
 43  import rospy.impl.registration 
 44  import rospy.impl.transport 
 45   
46 -def get_max_datagram_size():
47 #TODO 48 return 1024
49
50 -class UDPROSHandler(rospy.transport.ProtocolHandler):
51 """ 52 rospy protocol handler for UDPROS. Stores the datagram server if necessary. 53 """ 54
55 - def __init__(self, port=0):
56 """ 57 ctor 58 """ 59 self.port = port 60 self.buff_size = get_max_datagram_size()
61
62 - def init_server(self):
63 """ 64 Initialize and start the server thread, if not already initialized. 65 """ 66 if self.server is not None: 67 return 68 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 69 s.bind((roslib.network.get_bind_address(), self.port)) 70 if self.port == 0: 71 self.port = s.getsockname()[1] 72 self.server = s 73 threading.start_new_thread(self.run, ())
74
75 - def run(self):
76 buff_size = self.buff_size 77 try: 78 while not rospy.core.is_shutdown(): 79 data = self.server.recvfrom(self.buff_size) 80 print "received packet" 81 #TODO 82 except: 83 #TODO: log 84 pass
85
86 - def shutdown(self):
87 if self.sock is not None: 88 self.sock.close()
89
90 - def create_transport(self, topic_name, pub_uri, protocol_params):
91 """ 92 Connect to topic resolved_name on Publisher pub_uri using UDPROS. 93 @param resolved_name str: resolved topic name 94 @type resolved_name: str 95 @param pub_uri: XML-RPC URI of publisher 96 @type pub_uri: str 97 @param protocol_params: protocol parameters to use for connecting 98 @type protocol_params: [XmlRpcLegal] 99 @return: code, message, debug 100 @rtype: (int, str, int) 101 """ 102 103 #Validate protocol params = [UDPROS, address, port, headers] 104 if type(protocol_params) != list or len(protocol_params) != 4: 105 return 0, "ERROR: invalid UDPROS parameters", 0 106 if protocol_params[0] != UDPROS: 107 return 0, "INTERNAL ERROR: protocol id is not UDPROS: %s"%id, 0 108 109 #TODO: get connection_id and buffer size from params 110 id, dest_addr, dest_port, headers = protocol_params 111 112 self.init_server() 113 114 #TODO: parse/validate headers 115 116 sub = rospy.registration.get_topic_manager().get_subscriber_impl(topic_name) 117 # Create Transport 118 119 # TODO: create just a single 'connection' instance to represent 120 # all UDP connections. 'connection' can take care of unifying 121 # publication if addresses are the same 122 transport = UDPTransport(protocol, topic_name, sub.receive_callback) 123 124 # Attach connection to _SubscriberImpl 125 if sub.add_connection(transport): #pass udp connection to handler 126 return 1, "Connected topic[%s]. Transport impl[%s]"%(topic_name, transport.__class__.__name__), dest_port 127 else: 128 transport.close() 129 return 0, "ERROR: Race condition failure: duplicate topic subscriber [%s] was created"%(topic_name), 0
130
131 - def supports(self, protocol):
132 """ 133 @param protocol: name of protocol 134 @type protocol: str 135 @return: True if protocol is supported 136 @rtype: bool 137 """ 138 return protocol == UDPROS
139
140 - def get_supported(self):
141 """ 142 Get supported protocols 143 """ 144 return [[UDPROS]]
145
146 - def init_publisher(self, topic_name, protocol_params):
147 """ 148 Initialize this node to start publishing to a new UDP location. 149 150 @param resolved_name: topic name 151 @type resolved__name: str 152 153 @param protocol_params: requested protocol 154 parameters. protocol[0] must be the string 'UDPROS' 155 @type protocol_params: [str, value*] 156 @return: (code, msg, [UDPROS, addr, port]) 157 @rtype: (int, str, list) 158 """ 159 160 if protocol_params[0] != UDPROS: 161 return 0, "Internal error: protocol does not match UDPROS: %s"%protocol, [] 162 #TODO 163 _, header, host, port, max_datagram_size = protocol_params 164 #TODO: connection_id, max_datagraph_size 165 return 1, "ready", [UDPROS]
166
167 - def topic_connection_handler(self, sock, client_addr, header):
168 """ 169 Process incoming topic connection. Reads in topic name from 170 handshake and creates the appropriate L{TCPROSPub} handler for the 171 connection. 172 @param sock: socket connection 173 @type sock: socket.socket 174 @param client_addr: client address 175 @type client_addr: (str, int) 176 @param header: key/value pairs from handshake header 177 @type header: dict 178 @return: error string or None 179 @rtype: str 180 """ 181 for required in ['topic', 'md5sum', 'callerid']: 182 if not required in header: 183 return "Missing required '%s' field"%required 184 else: 185 resolved_topic_name = header['topic'] 186 md5sum = header['md5sum'] 187 tm = rospy.registration.get_topic_manager() 188 topic = tm.get_publisher_impl(resolved_topic_name) 189 if not topic: 190 return "[%s] is not a publisher of [%s]. Topics are %s"%(rospy.names.get_caller_id(), resolved_topic_name, tm.get_publications()) 191 elif md5sum != rospy.names.TOPIC_ANYTYPE and md5sum != topic.data_class._md5sum: 192 193 actual_type = topic.data_class._type 194 195 # check to see if subscriber sent 'type' header. If they did, check that 196 # types are same first as this provides a better debugging message 197 if 'type' in header: 198 requested_type = header['type'] 199 if requested_type != actual_type: 200 return "topic types do not match: [%s] vs. [%s]"%(requested_type, actual_type) 201 else: 202 # defaults to actual type 203 requested_type = actual_type 204 205 return "Client [%s] wants topic [%s] to have datatype/md5sum [%s/%s], but our version has [%s/%s] Dropping connection."%(header['callerid'], resolved_topic_name, requested_type, md5sum, actual_type, topic.data_class._md5sum) 206 207 else: 208 #TODO:POLLING if polling header is present, have to spin up receive loop as well 209 210 # #1334: tcp_nodelay support from subscriber option 211 if 'tcp_nodelay' in header: 212 tcp_nodelay = True if header['tcp_nodelay'].strip() == '1' else False 213 else: 214 tcp_nodelay = self.tcp_nodelay_map.get(resolved_topic_name, False) 215 216 _configure_pub_socket(sock, tcp_nodelay) 217 protocol = TCPROSPub(resolved_topic_name, topic.data_class, is_latch=topic.is_latch, headers=topic.headers) 218 transport = TCPROSTransport(protocol, resolved_topic_name) 219 transport.set_socket(sock, header['callerid']) 220 transport.write_header() 221 topic.add_connection(transport)
222 223 224 225 ## UDPROS communication routines
226 -class UDPROSTransport(rospy.transport.Transport):
227 transport_type = 'UDPROS' 228
229 - def __init__(self, protocol, name, header):
230 """ 231 ctor 232 @param name: topic name 233 @type name: str: 234 @param protocol: protocol implementation 235 @param protocol: UDPROSTransportProtocol 236 @param header: handshake header if transport handshake header was 237 already read off of transport. 238 @type header: dict 239 @throws TransportInitError: if transport cannot be initialized according to arguments 240 """ 241 super(UDPROSTransport, self).__init__(protocol.direction, name=name) 242 if not name: 243 raise TransportInitError("Unable to initialize transport: name is not set") 244 245 self.done = False 246 self.header = header
247
248 - def send_message(self, msg, seq):
249 """ 250 Convenience routine for services to send a message across a 251 particular connection. NOTE: write_data is much more efficient 252 if same message is being sent to multiple connections. Not 253 threadsafe. 254 @param msg: message to send 255 @type msg: Msg 256 @param seq: sequence number for message 257 @type seq: int 258 @raise TransportException: if error occurred sending message 259 """ 260 # this will call write_data(), so no need to keep track of stats 261 serialize_message(self.write_buff, seq, msg) 262 self.write_data(self.write_buff.getvalue()) 263 self.write_buff.truncate(0)
264
265 - def write_data(self, data):
266 """ 267 Write raw data to transport 268 @raise TransportInitialiationError: could not be initialized 269 @raise TransportTerminated: no longer open for publishing 270 """ 271 # TODO 272 # - cut into packets 273 # write to address 274 pass
275
276 - def receive_once(self):
277 """ 278 block until messages are read off of socket 279 @return: list of newly received messages 280 @rtype: [Msg] 281 @raise TransportException: if unable to receive message due to error 282 """ 283 pass
284 285 ## Receive messages until shutdown 286 ## @param self 287 ## @param msgs_callback fn([msg]): callback to invoke for new messages received
288 - def receive_loop(self, msgs_callback):
289 pass
290 291 ## close i/o and release resources
292 - def close(super):
293 self(UDPROSTransport, self).close() 294 #TODO 295 self.done = True
296 297 _handler = UDPROSHandler() 298
299 -def get_handler():
300 return _handler
301