Package rospy :: Package impl :: Module udpros

Source Code for Module rospy.impl.udpros

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id: udpros.py 14612 2011-08-09 08:54:11Z kwc $ 
 34   
 35  from __future__ import print_function 
 36   
 37  """ 
 38  UDPROS connection protocol. 
 39  """ 
 40  ## UDPROS connection protocol. 
 41  #  http://ros.org/wiki/ROS/UDPROS 
 42  #  
 43   
 44  import roslib.network 
 45   
 46  import rospy.impl.registration 
 47  import rospy.impl.transport 
 48   
49 -def get_max_datagram_size():
50 #TODO 51 return 1024
52
53 -class UDPROSHandler(rospy.transport.ProtocolHandler):
54 """ 55 rospy protocol handler for UDPROS. Stores the datagram server if necessary. 56 """ 57
58 - def __init__(self, port=0):
59 """ 60 ctor 61 """ 62 self.port = port 63 self.buff_size = get_max_datagram_size()
64
65 - def init_server(self):
66 """ 67 Initialize and start the server thread, if not already initialized. 68 """ 69 if self.server is not None: 70 return 71 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 72 s.bind((roslib.network.get_bind_address(), self.port)) 73 if self.port == 0: 74 self.port = s.getsockname()[1] 75 self.server = s 76 threading.start_new_thread(self.run, ())
77
78 - def run(self):
79 buff_size = self.buff_size 80 try: 81 while not rospy.core.is_shutdown(): 82 data = self.server.recvfrom(self.buff_size) 83 print("received packet") 84 #TODO 85 except: 86 #TODO: log 87 pass
88
89 - def shutdown(self):
90 if self.sock is not None: 91 self.sock.close()
92
93 - def create_transport(self, topic_name, pub_uri, protocol_params):
94 """ 95 Connect to topic resolved_name on Publisher pub_uri using UDPROS. 96 @param resolved_name str: resolved topic name 97 @type resolved_name: str 98 @param pub_uri: XML-RPC URI of publisher 99 @type pub_uri: str 100 @param protocol_params: protocol parameters to use for connecting 101 @type protocol_params: [XmlRpcLegal] 102 @return: code, message, debug 103 @rtype: (int, str, int) 104 """ 105 106 #Validate protocol params = [UDPROS, address, port, headers] 107 if type(protocol_params) != list or len(protocol_params) != 4: 108 return 0, "ERROR: invalid UDPROS parameters", 0 109 if protocol_params[0] != UDPROS: 110 return 0, "INTERNAL ERROR: protocol id is not UDPROS: %s"%id, 0 111 112 #TODO: get connection_id and buffer size from params 113 id, dest_addr, dest_port, headers = protocol_params 114 115 self.init_server() 116 117 #TODO: parse/validate headers 118 119 sub = rospy.registration.get_topic_manager().get_subscriber_impl(topic_name) 120 # Create Transport 121 122 # TODO: create just a single 'connection' instance to represent 123 # all UDP connections. 'connection' can take care of unifying 124 # publication if addresses are the same 125 transport = UDPTransport(protocol, topic_name, sub.receive_callback) 126 127 # Attach connection to _SubscriberImpl 128 if sub.add_connection(transport): #pass udp connection to handler 129 return 1, "Connected topic[%s]. Transport impl[%s]"%(topic_name, transport.__class__.__name__), dest_port 130 else: 131 transport.close() 132 return 0, "ERROR: Race condition failure: duplicate topic subscriber [%s] was created"%(topic_name), 0
133
134 - def supports(self, protocol):
135 """ 136 @param protocol: name of protocol 137 @type protocol: str 138 @return: True if protocol is supported 139 @rtype: bool 140 """ 141 return protocol == UDPROS
142
143 - def get_supported(self):
144 """ 145 Get supported protocols 146 """ 147 return [[UDPROS]]
148
149 - def init_publisher(self, topic_name, protocol_params):
150 """ 151 Initialize this node to start publishing to a new UDP location. 152 153 @param resolved_name: topic name 154 @type resolved__name: str 155 156 @param protocol_params: requested protocol 157 parameters. protocol[0] must be the string 'UDPROS' 158 @type protocol_params: [str, value*] 159 @return: (code, msg, [UDPROS, addr, port]) 160 @rtype: (int, str, list) 161 """ 162 163 if protocol_params[0] != UDPROS: 164 return 0, "Internal error: protocol does not match UDPROS: %s"%protocol, [] 165 #TODO 166 _, header, host, port, max_datagram_size = protocol_params 167 #TODO: connection_id, max_datagraph_size 168 return 1, "ready", [UDPROS]
169
170 - def topic_connection_handler(self, sock, client_addr, header):
171 """ 172 Process incoming topic connection. Reads in topic name from 173 handshake and creates the appropriate L{TCPROSPub} handler for the 174 connection. 175 @param sock: socket connection 176 @type sock: socket.socket 177 @param client_addr: client address 178 @type client_addr: (str, int) 179 @param header: key/value pairs from handshake header 180 @type header: dict 181 @return: error string or None 182 @rtype: str 183 """ 184 for required in ['topic', 'md5sum', 'callerid']: 185 if not required in header: 186 return "Missing required '%s' field"%required 187 else: 188 resolved_topic_name = header['topic'] 189 md5sum = header['md5sum'] 190 tm = rospy.registration.get_topic_manager() 191 topic = tm.get_publisher_impl(resolved_topic_name) 192 if not topic: 193 return "[%s] is not a publisher of [%s]. Topics are %s"%(rospy.names.get_caller_id(), resolved_topic_name, tm.get_publications()) 194 elif md5sum != rospy.names.TOPIC_ANYTYPE and md5sum != topic.data_class._md5sum: 195 196 actual_type = topic.data_class._type 197 198 # check to see if subscriber sent 'type' header. If they did, check that 199 # types are same first as this provides a better debugging message 200 if 'type' in header: 201 requested_type = header['type'] 202 if requested_type != actual_type: 203 return "topic types do not match: [%s] vs. [%s]"%(requested_type, actual_type) 204 else: 205 # defaults to actual type 206 requested_type = actual_type 207 208 return "Client [%s] wants topic [%s] to have datatype/md5sum [%s/%s], but our version has [%s/%s] Dropping connection."%(header['callerid'], resolved_topic_name, requested_type, md5sum, actual_type, topic.data_class._md5sum) 209 210 else: 211 #TODO:POLLING if polling header is present, have to spin up receive loop as well 212 213 # #1334: tcp_nodelay support from subscriber option 214 if 'tcp_nodelay' in header: 215 tcp_nodelay = True if header['tcp_nodelay'].strip() == '1' else False 216 else: 217 tcp_nodelay = self.tcp_nodelay_map.get(resolved_topic_name, False) 218 219 _configure_pub_socket(sock, tcp_nodelay) 220 protocol = TCPROSPub(resolved_topic_name, topic.data_class, is_latch=topic.is_latch, headers=topic.headers) 221 transport = TCPROSTransport(protocol, resolved_topic_name) 222 transport.set_socket(sock, header['callerid']) 223 transport.write_header() 224 topic.add_connection(transport)
225 226 227 228 ## UDPROS communication routines
229 -class UDPROSTransport(rospy.transport.Transport):
230 transport_type = 'UDPROS' 231
232 - def __init__(self, protocol, name, header):
233 """ 234 ctor 235 @param name: topic name 236 @type name: str: 237 @param protocol: protocol implementation 238 @param protocol: UDPROSTransportProtocol 239 @param header: handshake header if transport handshake header was 240 already read off of transport. 241 @type header: dict 242 @throws TransportInitError: if transport cannot be initialized according to arguments 243 """ 244 super(UDPROSTransport, self).__init__(protocol.direction, name=name) 245 if not name: 246 raise TransportInitError("Unable to initialize transport: name is not set") 247 248 self.done = False 249 self.header = header
250
251 - def send_message(self, msg, seq):
252 """ 253 Convenience routine for services to send a message across a 254 particular connection. NOTE: write_data is much more efficient 255 if same message is being sent to multiple connections. Not 256 threadsafe. 257 @param msg: message to send 258 @type msg: Msg 259 @param seq: sequence number for message 260 @type seq: int 261 @raise TransportException: if error occurred sending message 262 """ 263 # this will call write_data(), so no need to keep track of stats 264 serialize_message(self.write_buff, seq, msg) 265 self.write_data(self.write_buff.getvalue()) 266 self.write_buff.truncate(0)
267
268 - def write_data(self, data):
269 """ 270 Write raw data to transport 271 @raise TransportInitialiationError: could not be initialized 272 @raise TransportTerminated: no longer open for publishing 273 """ 274 # TODO 275 # - cut into packets 276 # write to address 277 pass
278
279 - def receive_once(self):
280 """ 281 block until messages are read off of socket 282 @return: list of newly received messages 283 @rtype: [Msg] 284 @raise TransportException: if unable to receive message due to error 285 """ 286 pass
287 288 ## Receive messages until shutdown 289 ## @param self 290 ## @param msgs_callback fn([msg]): callback to invoke for new messages received
291 - def receive_loop(self, msgs_callback):
292 pass
293 294 ## close i/o and release resources
295 - def close(super):
296 self(UDPROSTransport, self).close() 297 #TODO 298 self.done = True
299 300 _handler = UDPROSHandler() 301
302 -def get_handler():
303 return _handler
304