00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 """Node that proxies ROS topics and services in order to bridge firewalls"""
00036
00037 import rospy
00038
00039 import rostopic
00040 import rosservice
00041
00042 import rosproxy.handler
00043 import rosproxy.tcpros
00044
00045
00046 import roslib.message
00047 import rospy.names
00048 import thread
00049 from rospy.impl.registration import Registration
00050 from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol, TCPROSServer
00051 from rospy.service import ServiceManager
00052 from rospy.impl.tcpros_service import ServiceImpl
00053 from roslib.xmlrpc import XmlRpcNode
00054
00055 from new import classobj
00056
00057 import rospy.impl.registration
00058
00059 def pub_forwarder(msg, pub):
00060 """
00061 Forward a message to a supplied publisher
00062 """
00063 pub.publish(msg)
00064
00065
00066 class PassthroughServiceMessage(roslib.message.Message):
00067
00068 def __init__(self):
00069 self._buff = None
00070
00071 def serialize(self, buff):
00072 if self._buff is None:
00073 raise rospy.exceptions("AnyMsg is not initialized")
00074 else:
00075 buff.write(self._buff)
00076
00077 def deserialize(self, str):
00078 self._buff = str
00079 return self
00080
00081 class Proxy(object):
00082
00083 def __init__(self):
00084
00085 self.services = {}
00086
00087 self.service_manager = ServiceManager(registration_listeners=rospy.impl.registration.RegistrationListeners())
00088 self.topic_manager = rospy.topics._TopicManager()
00089
00090 self.subs_internal = {}
00091
00092 self.external_node = None
00093 self.external_tcpros = None
00094
00095 def connect(self):
00096 self.configure_internal()
00097 self.configure_external()
00098
00099 def _configure_proxy_services(self, external_tcpros):
00100 external_tcpros.service_connection_handler = self.service_connection_handler
00101
00102
00103
00104 i = 0
00105 for name, proxy in self.services.iteritems():
00106
00107 i += 1
00108 c = proxy.service_class
00109 service = ServiceImpl(name, proxy.service_class, proxy)
00110
00111 rospy.loginfo("registering proxied service %s"%(name))
00112 self.service_manager.register(name, service)
00113
00114 def _configure_proxy_topics(self, external_tcpros, tcpros_handler):
00115 external_tcpros.topic_connection_handler = tcpros_handler.topic_connection_handler
00116
00117 import rospy.topics
00118
00119
00120 subs_initialized = {}
00121 for resolved_name, sub in self.subs_internal.iteritems():
00122
00123 rospy.loginfo("create external publisher [%s]", resolved_name)
00124 pub = self.topic_manager.acquire_impl(Registration.PUB, resolved_name, sub.data_class)
00125
00126 sub_ex = rospy.Subscriber(resolved_name, sub.data_class, pub_forwarder, pub)
00127 subs_initialized[resolved_name] = sub_ex
00128
00129
00130
00131
00132
00133 del self.subs_internal
00134 self.subs_internal = subs_initialized
00135
00136
00137 def configure_external(self):
00138
00139
00140
00141
00142 self.external_tcpros = external_tcpros = TCPROSServer()
00143 tcpros_handler = rosproxy.tcpros.ProxyTCPROSHandler(self.topic_manager, external_tcpros)
00144
00145 self._configure_proxy_services(external_tcpros)
00146 self._configure_proxy_topics(external_tcpros, tcpros_handler)
00147
00148 tcpros_port = rospy.get_param('~tcpros_port', 11312)
00149 xmlrpc_port = rospy.get_param('~xmlrpc_port', 11313)
00150
00151 rospy.loginfo("reading port configuration: TCPROS[%s] XMLRPC[%s]"%(tcpros_port, xmlrpc_port))
00152
00153 external_tcpros.start_server(port=tcpros_port)
00154
00155
00156 rospy.loginfo("ROSRPC URI is rosrpc://%s:%s"%(roslib.network.get_local_address(), tcpros_port))
00157
00158
00159 master_uri = roslib.rosenv.get_master_uri()
00160 name = 'proxy-proxy'
00161
00162
00163 protocol_handlers = [tcpros_handler]
00164 rpc_handler = rosproxy.handler.ProxyHandler(name, master_uri, self.topic_manager, protocol_handlers)
00165 self.external_node = external_node= XmlRpcNode(xmlrpc_port, rpc_handler)
00166
00167
00168 external_node.start()
00169 import time
00170 timeout_t = time.time() + 10.
00171 while time.time() < timeout_t and external_node.uri is None:
00172 time.sleep(0.01)
00173
00174 rospy.loginfo("XMLRPC interface is up %s"%self.external_node.uri)
00175
00176
00177 def _configure_internal_services(self, service_names):
00178 """
00179 Create rospy handles to all of the services that we are
00180 configured to be able to call.
00181 """
00182
00183 i = 0
00184 for name in service_names:
00185 rospy.loginfo("configuring service %s", name)
00186 resolved_name = rospy.resolve_name(name)
00187 rospy.wait_for_service(name, timeout=60.)
00188 type_ = rosservice.get_service_type(resolved_name)
00189 if type_ is None:
00190 raise rospy.ROSInitException("cannot proxy service [%s]: unknown type"%resolved_name)
00191
00192 i += 1
00193
00194
00195
00196
00197
00198 real_service_class = roslib.message.get_service_class(type_)
00199 real_req = real_service_class._request_class
00200 real_resp = real_service_class._response_class
00201 request_d = {
00202 '_type': real_req._type,
00203 '_md5sum': real_req._md5sum,
00204 '_full_text': real_req._full_text,
00205 }
00206 response_d = {
00207 '_type': real_resp._type,
00208 '_md5sum': real_resp._md5sum,
00209 '_full_text': real_resp._full_text,
00210 }
00211 service_class = classobj('s_passthrough_%s'%i, (roslib.message.ServiceDefinition,), {
00212 '_type' : real_service_class._type,
00213 '_md5sum' : real_service_class._md5sum,
00214 '_request_class' : classobj('passthrough_request_%s'%i, (PassthroughServiceMessage, ), request_d),
00215 '_response_class' : classobj('passthrough_response_%s'%i, (PassthroughServiceMessage,), response_d),
00216 })
00217
00218 self.services[resolved_name] = rospy.ServiceProxy(name, service_class, persistent=True)
00219
00220 rospy.loginfo("proxying %s", resolved_name)
00221
00222 def _configure_internal_topics(self, pub_names):
00223 """
00224 Create subscriptions to all the topics that we externally publish.
00225 """
00226
00227 i = 0
00228 for name in pub_names:
00229 resolved_name = rospy.resolve_name(name)
00230 rospy.loginfo("configuring internal subscriber [%s]", resolved_name)
00231
00232 try:
00233 real_msg_class, _, _ = rostopic.get_topic_class(resolved_name)
00234 except rostopic.ROSTopicException:
00235 raise rospy.ROSInitException("cannot proxy subscription [%s]: unknown topic type"%resolved_name)
00236
00237 i += 1
00238 topic_class = classobj('t_passthrough_%s'%i, (rospy.msg.AnyMsg,), {
00239 '_type' : real_msg_class._type,
00240 '_md5sum' : real_msg_class._md5sum,
00241 })
00242 self.subs_internal[resolved_name] = rospy.Subscriber(name, topic_class)
00243
00244 rospy.loginfo("proxying %s", resolved_name)
00245
00246
00247 def configure_internal(self):
00248 """
00249 Bring up connections to internal ROS graph
00250 """
00251
00252 rospy.init_node('proxy')
00253
00254
00255
00256 service_names = rospy.get_param('~services', {})
00257 self._configure_internal_services(service_names)
00258
00259 pub_names = rospy.get_param('~pubs', {})
00260 self._configure_internal_topics(pub_names)
00261
00262
00263 def service_connection_handler(self, sock, client_addr, header):
00264 """
00265 @param sock: socket connection
00266 @type sock: socket
00267 @param client_addr: client address
00268 @type client_addr: (str, int)
00269 @param header: key/value pairs from handshake header
00270 @type header: dict
00271 @return: error string or None
00272 @rtype: str
00273 """
00274
00275
00276
00277
00278
00279
00280
00281 for required in ['service', 'md5sum', 'callerid']:
00282 if not required in header:
00283 return "Missing required '%s' field"%required
00284 else:
00285
00286 service_name = header['service']
00287
00288 sm = self.service_manager
00289 md5sum = header['md5sum']
00290 service = sm.get_service(service_name)
00291 if not service:
00292 return "[%s] is not a provider of [%s]"%(rospy.names.get_caller_id(), service_name)
00293 elif md5sum != rospy.names.SERVICE_ANYTYPE and md5sum != service.service_class._md5sum:
00294 return "request from [%s]: md5sums do not match: [%s] vs. [%s]"%(header['callerid'], md5sum, service.service_class._md5sum)
00295 else:
00296 transport = TCPROSTransport(service.protocol, service_name, header=header)
00297 transport.set_socket(sock, header['callerid'])
00298 transport.write_header()
00299 thread.start_new_thread(service.handle, (transport, header))
00300
00301 def rosproxy_main():
00302
00303 p = Proxy()
00304 p.connect()
00305
00306
00307 rospy.spin()