Package rospy :: Package impl :: Module udpros

Source Code for Module rospy.impl.udpros

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  from __future__ import print_function 
 36   
 37  """ 
 38  UDPROS connection protocol. 
 39  """ 
 40  ## UDPROS connection protocol. 
 41  #  http://ros.org/wiki/ROS/UDPROS 
 42  #  
 43   
 44  import socket 
 45  import threading 
 46  import rosgraph.network 
 47   
 48  import rospy.impl.registration 
 49  import rospy.impl.transport 
 50   
51 -def get_max_datagram_size():
52 #TODO 53 return 1024
54
55 -class UDPROSHandler(rospy.transport.ProtocolHandler):
56 """ 57 rospy protocol handler for UDPROS. Stores the datagram server if necessary. 58 """ 59
60 - def __init__(self, port=0):
61 """ 62 ctor 63 """ 64 self.port = port 65 self.buff_size = get_max_datagram_size()
66
67 - def init_server(self):
68 """ 69 Initialize and start the server thread, if not already initialized. 70 """ 71 if self.server is not None: 72 return 73 if rosgraph.network.use_ipv6(): 74 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) 75 else: 76 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 77 s.bind((rosgraph.network.get_bind_address(), self.port)) 78 if self.port == 0: 79 self.port = s.getsockname()[1] 80 self.server = s 81 threading.start_new_thread(self.run, ())
82
83 - def run(self):
84 buff_size = self.buff_size 85 try: 86 while not rospy.core.is_shutdown(): 87 data = self.server.recvfrom(self.buff_size) 88 print("received packet") 89 #TODO 90 except: 91 #TODO: log 92 pass
93
94 - def shutdown(self):
95 if self.sock is not None: 96 self.sock.close()
97
98 - def create_transport(self, topic_name, pub_uri, protocol_params):
99 """ 100 Connect to topic resolved_name on Publisher pub_uri using UDPROS. 101 @param resolved_name str: resolved topic name 102 @type resolved_name: str 103 @param pub_uri: XML-RPC URI of publisher 104 @type pub_uri: str 105 @param protocol_params: protocol parameters to use for connecting 106 @type protocol_params: [XmlRpcLegal] 107 @return: code, message, debug 108 @rtype: (int, str, int) 109 """ 110 111 #Validate protocol params = [UDPROS, address, port, headers] 112 if type(protocol_params) != list or len(protocol_params) != 4: 113 return 0, "ERROR: invalid UDPROS parameters", 0 114 if protocol_params[0] != UDPROS: 115 return 0, "INTERNAL ERROR: protocol id is not UDPROS: %s"%id, 0 116 117 #TODO: get connection_id and buffer size from params 118 id, dest_addr, dest_port, headers = protocol_params 119 120 self.init_server() 121 122 #TODO: parse/validate headers 123 124 sub = rospy.registration.get_topic_manager().get_subscriber_impl(topic_name) 125 # Create Transport 126 127 # TODO: create just a single 'connection' instance to represent 128 # all UDP connections. 'connection' can take care of unifying 129 # publication if addresses are the same 130 transport = UDPTransport(protocol, topic_name, sub.receive_callback) 131 132 # Attach connection to _SubscriberImpl 133 if sub.add_connection(transport): #pass udp connection to handler 134 return 1, "Connected topic[%s]. Transport impl[%s]"%(topic_name, transport.__class__.__name__), dest_port 135 else: 136 transport.close() 137 return 0, "ERROR: Race condition failure: duplicate topic subscriber [%s] was created"%(topic_name), 0
138
139 - def supports(self, protocol):
140 """ 141 @param protocol: name of protocol 142 @type protocol: str 143 @return: True if protocol is supported 144 @rtype: bool 145 """ 146 return protocol == UDPROS
147
148 - def get_supported(self):
149 """ 150 Get supported protocols 151 """ 152 return [[UDPROS]]
153
154 - def init_publisher(self, topic_name, protocol_params):
155 """ 156 Initialize this node to start publishing to a new UDP location. 157 158 @param resolved_name: topic name 159 @type resolved__name: str 160 161 @param protocol_params: requested protocol 162 parameters. protocol[0] must be the string 'UDPROS' 163 @type protocol_params: [str, value*] 164 @return: (code, msg, [UDPROS, addr, port]) 165 @rtype: (int, str, list) 166 """ 167 168 if protocol_params[0] != UDPROS: 169 return 0, "Internal error: protocol does not match UDPROS: %s"%protocol, [] 170 #TODO 171 _, header, host, port, max_datagram_size = protocol_params 172 #TODO: connection_id, max_datagraph_size 173 return 1, "ready", [UDPROS]
174
175 - def topic_connection_handler(self, sock, client_addr, header):
176 """ 177 Process incoming topic connection. Reads in topic name from 178 handshake and creates the appropriate L{TCPROSPub} handler for the 179 connection. 180 @param sock: socket connection 181 @type sock: socket.socket 182 @param client_addr: client address 183 @type client_addr: (str, int) 184 @param header: key/value pairs from handshake header 185 @type header: dict 186 @return: error string or None 187 @rtype: str 188 """ 189 for required in ['topic', 'md5sum', 'callerid']: 190 if not required in header: 191 return "Missing required '%s' field"%required 192 else: 193 resolved_topic_name = header['topic'] 194 md5sum = header['md5sum'] 195 tm = rospy.registration.get_topic_manager() 196 topic = tm.get_publisher_impl(resolved_topic_name) 197 if not topic: 198 return "[%s] is not a publisher of [%s]. Topics are %s"%(rospy.names.get_caller_id(), resolved_topic_name, tm.get_publications()) 199 elif md5sum != rospy.names.TOPIC_ANYTYPE and md5sum != topic.data_class._md5sum: 200 201 actual_type = topic.data_class._type 202 203 # check to see if subscriber sent 'type' header. If they did, check that 204 # types are same first as this provides a better debugging message 205 if 'type' in header: 206 requested_type = header['type'] 207 if requested_type != actual_type: 208 return "topic types do not match: [%s] vs. [%s]"%(requested_type, actual_type) 209 else: 210 # defaults to actual type 211 requested_type = actual_type 212 213 return "Client [%s] wants topic [%s] to have datatype/md5sum [%s/%s], but our version has [%s/%s] Dropping connection."%(header['callerid'], resolved_topic_name, requested_type, md5sum, actual_type, topic.data_class._md5sum) 214 215 else: 216 #TODO:POLLING if polling header is present, have to spin up receive loop as well 217 218 # #1334: tcp_nodelay support from subscriber option 219 if 'tcp_nodelay' in header: 220 tcp_nodelay = True if header['tcp_nodelay'].strip() == '1' else False 221 else: 222 tcp_nodelay = self.tcp_nodelay_map.get(resolved_topic_name, False) 223 224 _configure_pub_socket(sock, tcp_nodelay) 225 protocol = TCPROSPub(resolved_topic_name, topic.data_class, is_latch=topic.is_latch, headers=topic.headers) 226 transport = TCPROSTransport(protocol, resolved_topic_name) 227 transport.set_socket(sock, header['callerid']) 228 transport.write_header() 229 topic.add_connection(transport)
230 231 232 233 ## UDPROS communication routines
234 -class UDPROSTransport(rospy.transport.Transport):
235 transport_type = 'UDPROS' 236
237 - def __init__(self, protocol, name, header):
238 """ 239 ctor 240 @param name: topic name 241 @type name: str: 242 @param protocol: protocol implementation 243 @param protocol: UDPROSTransportProtocol 244 @param header: handshake header if transport handshake header was 245 already read off of transport. 246 @type header: dict 247 @throws TransportInitError: if transport cannot be initialized according to arguments 248 """ 249 super(UDPROSTransport, self).__init__(protocol.direction, name=name) 250 if not name: 251 raise TransportInitError("Unable to initialize transport: name is not set") 252 253 self.done = False 254 self.header = header
255
256 - def send_message(self, msg, seq):
257 """ 258 Convenience routine for services to send a message across a 259 particular connection. NOTE: write_data is much more efficient 260 if same message is being sent to multiple connections. Not 261 threadsafe. 262 @param msg: message to send 263 @type msg: Msg 264 @param seq: sequence number for message 265 @type seq: int 266 @raise TransportException: if error occurred sending message 267 """ 268 # this will call write_data(), so no need to keep track of stats 269 serialize_message(self.write_buff, seq, msg) 270 self.write_data(self.write_buff.getvalue()) 271 self.write_buff.truncate(0) 272 self.write_buff.seek(0)
273
274 - def write_data(self, data):
275 """ 276 Write raw data to transport 277 @raise TransportInitialiationError: could not be initialized 278 @raise TransportTerminated: no longer open for publishing 279 """ 280 # TODO 281 # - cut into packets 282 # write to address 283 pass
284
285 - def receive_once(self):
286 """ 287 block until messages are read off of socket 288 @return: list of newly received messages 289 @rtype: [Msg] 290 @raise TransportException: if unable to receive message due to error 291 """ 292 pass
293 294 ## Receive messages until shutdown 295 ## @param self 296 ## @param msgs_callback fn([msg]): callback to invoke for new messages received
297 - def receive_loop(self, msgs_callback):
298 pass
299 300 ## close i/o and release resources
301 - def close(super):
302 self(UDPROSTransport, self).close() 303 #TODO 304 self.done = True
305 306 _handler = UDPROSHandler() 307
308 -def get_handler():
309 return _handler
310