Package rospy :: Package impl :: Module udpros

Source Code for Module rospy.impl.udpros

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  from __future__ import print_function 
 36   
 37  """ 
 38  UDPROS connection protocol. 
 39  """ 
 40  ## UDPROS connection protocol. 
 41  #  http://ros.org/wiki/ROS/UDPROS 
 42  #  
 43   
 44  import rosgraph.network 
 45   
 46  import rospy.impl.registration 
 47  import rospy.impl.transport 
 48   
49 -def get_max_datagram_size():
50 #TODO 51 return 1024
52
53 -class UDPROSHandler(rospy.transport.ProtocolHandler):
54 """ 55 rospy protocol handler for UDPROS. Stores the datagram server if necessary. 56 """ 57
58 - def __init__(self, port=0):
59 """ 60 ctor 61 """ 62 self.port = port 63 self.buff_size = get_max_datagram_size()
64
65 - def init_server(self):
66 """ 67 Initialize and start the server thread, if not already initialized. 68 """ 69 if self.server is not None: 70 return 71 if rosgraph.network.use_ipv6(): 72 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) 73 else: 74 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 75 s.bind((rosgraph.network.get_bind_address(), self.port)) 76 if self.port == 0: 77 self.port = s.getsockname()[1] 78 self.server = s 79 threading.start_new_thread(self.run, ())
80
81 - def run(self):
82 buff_size = self.buff_size 83 try: 84 while not rospy.core.is_shutdown(): 85 data = self.server.recvfrom(self.buff_size) 86 print("received packet") 87 #TODO 88 except: 89 #TODO: log 90 pass
91
92 - def shutdown(self):
93 if self.sock is not None: 94 self.sock.close()
95
96 - def create_transport(self, topic_name, pub_uri, protocol_params):
97 """ 98 Connect to topic resolved_name on Publisher pub_uri using UDPROS. 99 @param resolved_name str: resolved topic name 100 @type resolved_name: str 101 @param pub_uri: XML-RPC URI of publisher 102 @type pub_uri: str 103 @param protocol_params: protocol parameters to use for connecting 104 @type protocol_params: [XmlRpcLegal] 105 @return: code, message, debug 106 @rtype: (int, str, int) 107 """ 108 109 #Validate protocol params = [UDPROS, address, port, headers] 110 if type(protocol_params) != list or len(protocol_params) != 4: 111 return 0, "ERROR: invalid UDPROS parameters", 0 112 if protocol_params[0] != UDPROS: 113 return 0, "INTERNAL ERROR: protocol id is not UDPROS: %s"%id, 0 114 115 #TODO: get connection_id and buffer size from params 116 id, dest_addr, dest_port, headers = protocol_params 117 118 self.init_server() 119 120 #TODO: parse/validate headers 121 122 sub = rospy.registration.get_topic_manager().get_subscriber_impl(topic_name) 123 # Create Transport 124 125 # TODO: create just a single 'connection' instance to represent 126 # all UDP connections. 'connection' can take care of unifying 127 # publication if addresses are the same 128 transport = UDPTransport(protocol, topic_name, sub.receive_callback) 129 130 # Attach connection to _SubscriberImpl 131 if sub.add_connection(transport): #pass udp connection to handler 132 return 1, "Connected topic[%s]. Transport impl[%s]"%(topic_name, transport.__class__.__name__), dest_port 133 else: 134 transport.close() 135 return 0, "ERROR: Race condition failure: duplicate topic subscriber [%s] was created"%(topic_name), 0
136
137 - def supports(self, protocol):
138 """ 139 @param protocol: name of protocol 140 @type protocol: str 141 @return: True if protocol is supported 142 @rtype: bool 143 """ 144 return protocol == UDPROS
145
146 - def get_supported(self):
147 """ 148 Get supported protocols 149 """ 150 return [[UDPROS]]
151
152 - def init_publisher(self, topic_name, protocol_params):
153 """ 154 Initialize this node to start publishing to a new UDP location. 155 156 @param resolved_name: topic name 157 @type resolved__name: str 158 159 @param protocol_params: requested protocol 160 parameters. protocol[0] must be the string 'UDPROS' 161 @type protocol_params: [str, value*] 162 @return: (code, msg, [UDPROS, addr, port]) 163 @rtype: (int, str, list) 164 """ 165 166 if protocol_params[0] != UDPROS: 167 return 0, "Internal error: protocol does not match UDPROS: %s"%protocol, [] 168 #TODO 169 _, header, host, port, max_datagram_size = protocol_params 170 #TODO: connection_id, max_datagraph_size 171 return 1, "ready", [UDPROS]
172
173 - def topic_connection_handler(self, sock, client_addr, header):
174 """ 175 Process incoming topic connection. Reads in topic name from 176 handshake and creates the appropriate L{TCPROSPub} handler for the 177 connection. 178 @param sock: socket connection 179 @type sock: socket.socket 180 @param client_addr: client address 181 @type client_addr: (str, int) 182 @param header: key/value pairs from handshake header 183 @type header: dict 184 @return: error string or None 185 @rtype: str 186 """ 187 for required in ['topic', 'md5sum', 'callerid']: 188 if not required in header: 189 return "Missing required '%s' field"%required 190 else: 191 resolved_topic_name = header['topic'] 192 md5sum = header['md5sum'] 193 tm = rospy.registration.get_topic_manager() 194 topic = tm.get_publisher_impl(resolved_topic_name) 195 if not topic: 196 return "[%s] is not a publisher of [%s]. Topics are %s"%(rospy.names.get_caller_id(), resolved_topic_name, tm.get_publications()) 197 elif md5sum != rospy.names.TOPIC_ANYTYPE and md5sum != topic.data_class._md5sum: 198 199 actual_type = topic.data_class._type 200 201 # check to see if subscriber sent 'type' header. If they did, check that 202 # types are same first as this provides a better debugging message 203 if 'type' in header: 204 requested_type = header['type'] 205 if requested_type != actual_type: 206 return "topic types do not match: [%s] vs. [%s]"%(requested_type, actual_type) 207 else: 208 # defaults to actual type 209 requested_type = actual_type 210 211 return "Client [%s] wants topic [%s] to have datatype/md5sum [%s/%s], but our version has [%s/%s] Dropping connection."%(header['callerid'], resolved_topic_name, requested_type, md5sum, actual_type, topic.data_class._md5sum) 212 213 else: 214 #TODO:POLLING if polling header is present, have to spin up receive loop as well 215 216 # #1334: tcp_nodelay support from subscriber option 217 if 'tcp_nodelay' in header: 218 tcp_nodelay = True if header['tcp_nodelay'].strip() == '1' else False 219 else: 220 tcp_nodelay = self.tcp_nodelay_map.get(resolved_topic_name, False) 221 222 _configure_pub_socket(sock, tcp_nodelay) 223 protocol = TCPROSPub(resolved_topic_name, topic.data_class, is_latch=topic.is_latch, headers=topic.headers) 224 transport = TCPROSTransport(protocol, resolved_topic_name) 225 transport.set_socket(sock, header['callerid']) 226 transport.write_header() 227 topic.add_connection(transport)
228 229 230 231 ## UDPROS communication routines
232 -class UDPROSTransport(rospy.transport.Transport):
233 transport_type = 'UDPROS' 234
235 - def __init__(self, protocol, name, header):
236 """ 237 ctor 238 @param name: topic name 239 @type name: str: 240 @param protocol: protocol implementation 241 @param protocol: UDPROSTransportProtocol 242 @param header: handshake header if transport handshake header was 243 already read off of transport. 244 @type header: dict 245 @throws TransportInitError: if transport cannot be initialized according to arguments 246 """ 247 super(UDPROSTransport, self).__init__(protocol.direction, name=name) 248 if not name: 249 raise TransportInitError("Unable to initialize transport: name is not set") 250 251 self.done = False 252 self.header = header
253
254 - def send_message(self, msg, seq):
255 """ 256 Convenience routine for services to send a message across a 257 particular connection. NOTE: write_data is much more efficient 258 if same message is being sent to multiple connections. Not 259 threadsafe. 260 @param msg: message to send 261 @type msg: Msg 262 @param seq: sequence number for message 263 @type seq: int 264 @raise TransportException: if error occurred sending message 265 """ 266 # this will call write_data(), so no need to keep track of stats 267 serialize_message(self.write_buff, seq, msg) 268 self.write_data(self.write_buff.getvalue()) 269 self.write_buff.truncate(0)
270
271 - def write_data(self, data):
272 """ 273 Write raw data to transport 274 @raise TransportInitialiationError: could not be initialized 275 @raise TransportTerminated: no longer open for publishing 276 """ 277 # TODO 278 # - cut into packets 279 # write to address 280 pass
281
282 - def receive_once(self):
283 """ 284 block until messages are read off of socket 285 @return: list of newly received messages 286 @rtype: [Msg] 287 @raise TransportException: if unable to receive message due to error 288 """ 289 pass
290 291 ## Receive messages until shutdown 292 ## @param self 293 ## @param msgs_callback fn([msg]): callback to invoke for new messages received
294 - def receive_loop(self, msgs_callback):
295 pass
296 297 ## close i/o and release resources
298 - def close(super):
299 self(UDPROSTransport, self).close() 300 #TODO 301 self.done = True
302 303 _handler = UDPROSHandler() 304
305 -def get_handler():
306 return _handler
307