$search
00001 # Software License Agreement (BSD License) 00002 # 00003 # Copyright (c) 2010, Willow Garage, Inc. 00004 # All rights reserved. 00005 # 00006 # Redistribution and use in source and binary forms, with or without 00007 # modification, are permitted provided that the following conditions 00008 # are met: 00009 # 00010 # * Redistributions of source code must retain the above copyright 00011 # notice, this list of conditions and the following disclaimer. 00012 # * Redistributions in binary form must reproduce the above 00013 # copyright notice, this list of conditions and the following 00014 # disclaimer in the documentation and/or other materials provided 00015 # with the distribution. 00016 # * Neither the name of Willow Garage, Inc. nor the names of its 00017 # contributors may be used to endorse or promote products derived 00018 # from this software without specific prior written permission. 00019 # 00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00031 # POSSIBILITY OF SUCH DAMAGE. 00032 # 00033 # Revision $Id: __init__.py 11414 2010-10-05 23:39:47Z tfield $ 00034 00035 """Node that proxies ROS topics and services in order to bridge firewalls""" 00036 00037 import rospy 00038 00039 import rostopic 00040 import rosservice 00041 00042 import rosproxy.handler 00043 import rosproxy.tcpros 00044 00045 # for service_connection_handler 00046 import roslib.message 00047 import rospy.names 00048 import thread 00049 from rospy.impl.registration import Registration 00050 from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol, TCPROSServer 00051 from rospy.service import ServiceManager 00052 from rospy.impl.tcpros_service import ServiceImpl 00053 from roslib.xmlrpc import XmlRpcNode 00054 00055 from new import classobj 00056 00057 import rospy.impl.registration 00058 00059 def pub_forwarder(msg, pub): 00060 """ 00061 Forward a message to a supplied publisher 00062 """ 00063 pub.publish(msg) 00064 00065 # variant of AnyMsg that initializes from a msg_class 00066 class PassthroughServiceMessage(roslib.message.Message): 00067 00068 def __init__(self): 00069 self._buff = None 00070 00071 def serialize(self, buff): 00072 if self._buff is None: 00073 raise rospy.exceptions("AnyMsg is not initialized") 00074 else: 00075 buff.write(self._buff) 00076 00077 def deserialize(self, str): 00078 self._buff = str 00079 return self 00080 00081 class Proxy(object): 00082 00083 def __init__(self): 00084 00085 self.services = {} 00086 00087 self.service_manager = ServiceManager(registration_listeners=rospy.impl.registration.RegistrationListeners()) 00088 self.topic_manager = rospy.topics._TopicManager() 00089 00090 self.subs_internal = {} 00091 00092 self.external_node = None 00093 self.external_tcpros = None 00094 00095 def connect(self): 00096 self.configure_internal() 00097 self.configure_external() 00098 00099 def _configure_proxy_services(self, external_tcpros): 00100 external_tcpros.service_connection_handler = self.service_connection_handler 00101 00102 # override the registration listeners to do nothing so we don't appear to be actually registering this listeners 00103 # create proxy handlers for each service 00104 i = 0 00105 for name, proxy in self.services.iteritems(): 00106 00107 i += 1 00108 c = proxy.service_class 00109 service = ServiceImpl(name, proxy.service_class, proxy) 00110 00111 rospy.loginfo("registering proxied service %s"%(name)) 00112 self.service_manager.register(name, service) 00113 00114 def _configure_proxy_topics(self, external_tcpros, tcpros_handler): 00115 external_tcpros.topic_connection_handler = tcpros_handler.topic_connection_handler 00116 00117 import rospy.topics 00118 00119 # create publishers for each of the topics we internally subscribe to 00120 subs_initialized = {} 00121 for resolved_name, sub in self.subs_internal.iteritems(): 00122 # create a pub handle 00123 rospy.loginfo("create external publisher [%s]", resolved_name) 00124 pub = self.topic_manager.acquire_impl(Registration.PUB, resolved_name, sub.data_class) 00125 # clone the subscriber handle, this time with a callback 00126 sub_ex = rospy.Subscriber(resolved_name, sub.data_class, pub_forwarder, pub) 00127 subs_initialized[resolved_name] = sub_ex 00128 # decrement the ref count of the old handle 00129 #sub.unregister() 00130 00131 # throw away old subs_internal dictionary and replace it with 00132 # the initialized objects 00133 del self.subs_internal 00134 self.subs_internal = subs_initialized 00135 00136 00137 def configure_external(self): 00138 00139 00140 # Start another TCPROSServer to handle requests on the external port 00141 # - TODO: allow handlers these to be passed into TCPROSServer constructor 00142 self.external_tcpros = external_tcpros = TCPROSServer() 00143 tcpros_handler = rosproxy.tcpros.ProxyTCPROSHandler(self.topic_manager, external_tcpros) 00144 00145 self._configure_proxy_services(external_tcpros) 00146 self._configure_proxy_topics(external_tcpros, tcpros_handler) 00147 00148 tcpros_port = rospy.get_param('~tcpros_port', 11312) 00149 xmlrpc_port = rospy.get_param('~xmlrpc_port', 11313) 00150 00151 rospy.loginfo("reading port configuration: TCPROS[%s] XMLRPC[%s]"%(tcpros_port, xmlrpc_port)) 00152 00153 external_tcpros.start_server(port=tcpros_port) 00154 00155 # TODO: this may report the address of the wrong interface 00156 rospy.loginfo("ROSRPC URI is rosrpc://%s:%s"%(roslib.network.get_local_address(), tcpros_port)) 00157 00158 # Startup XMLRPC interface so we can do pub/sub 00159 master_uri = roslib.rosenv.get_master_uri() 00160 name = 'proxy-proxy' 00161 00162 00163 protocol_handlers = [tcpros_handler] 00164 rpc_handler = rosproxy.handler.ProxyHandler(name, master_uri, self.topic_manager, protocol_handlers) 00165 self.external_node = external_node= XmlRpcNode(xmlrpc_port, rpc_handler) 00166 00167 # - start the node and wait for init 00168 external_node.start() 00169 import time 00170 timeout_t = time.time() + 10. 00171 while time.time() < timeout_t and external_node.uri is None: 00172 time.sleep(0.01) 00173 00174 rospy.loginfo("XMLRPC interface is up %s"%self.external_node.uri) 00175 00176 00177 def _configure_internal_services(self, service_names): 00178 """ 00179 Create rospy handles to all of the services that we are 00180 configured to be able to call. 00181 """ 00182 00183 i = 0 00184 for name in service_names: 00185 rospy.loginfo("configuring service %s", name) 00186 resolved_name = rospy.resolve_name(name) 00187 rospy.wait_for_service(name, timeout=60.) 00188 type_ = rosservice.get_service_type(resolved_name) 00189 if type_ is None: 00190 raise rospy.ROSInitException("cannot proxy service [%s]: unknown type"%resolved_name) 00191 00192 i += 1 00193 00194 # for efficiency, we generate a passthrough service 00195 # definition that does not do any serializatoin on the 00196 # request or response. This requires more work because the 00197 # instantiated class has to have the correct properties. 00198 real_service_class = roslib.message.get_service_class(type_) 00199 real_req = real_service_class._request_class 00200 real_resp = real_service_class._response_class 00201 request_d = { 00202 '_type': real_req._type, 00203 '_md5sum': real_req._md5sum, 00204 '_full_text': real_req._full_text, 00205 } 00206 response_d = { 00207 '_type': real_resp._type, 00208 '_md5sum': real_resp._md5sum, 00209 '_full_text': real_resp._full_text, 00210 } 00211 service_class = classobj('s_passthrough_%s'%i, (roslib.message.ServiceDefinition,), { 00212 '_type' : real_service_class._type, 00213 '_md5sum' : real_service_class._md5sum, 00214 '_request_class' : classobj('passthrough_request_%s'%i, (PassthroughServiceMessage, ), request_d), 00215 '_response_class' : classobj('passthrough_response_%s'%i, (PassthroughServiceMessage,), response_d), 00216 }) 00217 00218 self.services[resolved_name] = rospy.ServiceProxy(name, service_class, persistent=True) 00219 00220 rospy.loginfo("proxying %s", resolved_name) 00221 00222 def _configure_internal_topics(self, pub_names): 00223 """ 00224 Create subscriptions to all the topics that we externally publish. 00225 """ 00226 00227 i = 0 00228 for name in pub_names: 00229 resolved_name = rospy.resolve_name(name) 00230 rospy.loginfo("configuring internal subscriber [%s]", resolved_name) 00231 00232 try: 00233 real_msg_class, _, _ = rostopic.get_topic_class(resolved_name) 00234 except rostopic.ROSTopicException: 00235 raise rospy.ROSInitException("cannot proxy subscription [%s]: unknown topic type"%resolved_name) 00236 00237 i += 1 00238 topic_class = classobj('t_passthrough_%s'%i, (rospy.msg.AnyMsg,), { 00239 '_type' : real_msg_class._type, 00240 '_md5sum' : real_msg_class._md5sum, 00241 }) 00242 self.subs_internal[resolved_name] = rospy.Subscriber(name, topic_class) 00243 00244 rospy.loginfo("proxying %s", resolved_name) 00245 00246 00247 def configure_internal(self): 00248 """ 00249 Bring up connections to internal ROS graph 00250 """ 00251 00252 rospy.init_node('proxy') 00253 00254 # fetch all the parameters for our node 00255 00256 service_names = rospy.get_param('~services', {}) 00257 self._configure_internal_services(service_names) 00258 00259 pub_names = rospy.get_param('~pubs', {}) 00260 self._configure_internal_topics(pub_names) 00261 00262 00263 def service_connection_handler(self, sock, client_addr, header): 00264 """ 00265 @param sock: socket connection 00266 @type sock: socket 00267 @param client_addr: client address 00268 @type client_addr: (str, int) 00269 @param header: key/value pairs from handshake header 00270 @type header: dict 00271 @return: error string or None 00272 @rtype: str 00273 """ 00274 00275 # This is a cturtle hack. rospy's service_connection_handler 00276 # is wired to the ServiceManager singleton. If we replace the 00277 # singleton with something more configurable, then we simply 00278 # have to run our own ServiceManager to handle the forwarding 00279 # behavior. 00280 00281 for required in ['service', 'md5sum', 'callerid']: 00282 if not required in header: 00283 return "Missing required '%s' field"%required 00284 else: 00285 #logger.debug("connection from %s:%s", client_addr[0], client_addr[1]) 00286 service_name = header['service'] 00287 00288 sm = self.service_manager 00289 md5sum = header['md5sum'] 00290 service = sm.get_service(service_name) 00291 if not service: 00292 return "[%s] is not a provider of [%s]"%(rospy.names.get_caller_id(), service_name) 00293 elif md5sum != rospy.names.SERVICE_ANYTYPE and md5sum != service.service_class._md5sum: 00294 return "request from [%s]: md5sums do not match: [%s] vs. [%s]"%(header['callerid'], md5sum, service.service_class._md5sum) 00295 else: 00296 transport = TCPROSTransport(service.protocol, service_name, header=header) 00297 transport.set_socket(sock, header['callerid']) 00298 transport.write_header() 00299 thread.start_new_thread(service.handle, (transport, header)) 00300 00301 def rosproxy_main(): 00302 00303 p = Proxy() 00304 p.connect() 00305 00306 # wait for shutdown 00307 rospy.spin()