00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 """Node that proxies ROS topics and services in order to bridge firewalls"""
00036
00037 import thread
00038 from new import classobj
00039
00040
00041 import genpy
00042 import roslib.message
00043
00044 import rospy
00045 import rospy.names
00046
00047 from rospy.impl.registration import Registration, RegistrationListeners
00048 from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol, TCPROSServer
00049 from rospy.service import ServiceManager
00050 from rospy.impl.tcpros_service import ServiceImpl
00051
00052 import rosgraph
00053 from rosgraph.xmlrpc import XmlRpcNode
00054 import rosgraph.network
00055
00056 import rostopic
00057 import rosservice
00058
00059 import rosproxy.handler
00060 import rosproxy.tcpros
00061
00062 def pub_forwarder(msg, pub):
00063 """
00064 Forward a message to a supplied publisher
00065 """
00066 pub.publish(msg)
00067
00068
00069 class PassthroughServiceMessage(genpy.Message):
00070
00071 def __init__(self):
00072 self._buff = None
00073
00074 def serialize(self, buff):
00075 if self._buff is None:
00076 raise rospy.exceptions("AnyMsg is not initialized")
00077 else:
00078 buff.write(self._buff)
00079
00080 def deserialize(self, str):
00081 self._buff = str
00082 return self
00083
00084 class Proxy(object):
00085
00086 def __init__(self):
00087
00088 self.services = {}
00089
00090 self.service_manager = ServiceManager(registration_listeners=RegistrationListeners())
00091 self.topic_manager = rospy.topics._TopicManager()
00092
00093 self.subs_internal = {}
00094
00095 self.external_node = None
00096 self.external_tcpros = None
00097
00098 def connect(self):
00099 self.configure_internal()
00100 self.configure_external()
00101
00102 def _configure_proxy_services(self, external_tcpros):
00103 external_tcpros.service_connection_handler = self.service_connection_handler
00104
00105
00106
00107 i = 0
00108 for name, proxy in self.services.iteritems():
00109
00110 i += 1
00111 c = proxy.service_class
00112 service = ServiceImpl(name, proxy.service_class, proxy)
00113
00114 rospy.loginfo("registering proxied service %s"%(name))
00115 self.service_manager.register(name, service)
00116
00117 def _configure_proxy_topics(self, external_tcpros, tcpros_handler):
00118 external_tcpros.topic_connection_handler = tcpros_handler.topic_connection_handler
00119
00120 import rospy.topics
00121
00122
00123 subs_initialized = {}
00124 for resolved_name, sub in self.subs_internal.iteritems():
00125
00126 rospy.loginfo("create external publisher [%s]", resolved_name)
00127 pub = self.topic_manager.acquire_impl(Registration.PUB, resolved_name, sub.data_class)
00128
00129 sub_ex = rospy.Subscriber(resolved_name, sub.data_class, pub_forwarder, pub)
00130 subs_initialized[resolved_name] = sub_ex
00131
00132
00133
00134
00135
00136 del self.subs_internal
00137 self.subs_internal = subs_initialized
00138
00139
00140 def configure_external(self):
00141
00142
00143
00144
00145 self.external_tcpros = external_tcpros = TCPROSServer()
00146 tcpros_handler = rosproxy.tcpros.ProxyTCPROSHandler(self.topic_manager, external_tcpros)
00147
00148 self._configure_proxy_services(external_tcpros)
00149 self._configure_proxy_topics(external_tcpros, tcpros_handler)
00150
00151 tcpros_port = rospy.get_param('~tcpros_port', 11312)
00152 xmlrpc_port = rospy.get_param('~xmlrpc_port', 11313)
00153
00154 rospy.loginfo("reading port configuration: TCPROS[%s] XMLRPC[%s]"%(tcpros_port, xmlrpc_port))
00155
00156 external_tcpros.start_server(port=tcpros_port)
00157
00158
00159 rospy.loginfo("ROSRPC URI is rosrpc://%s:%s"%(rosgraph.network.get_local_address(), tcpros_port))
00160
00161
00162 master_uri = rosgraph.get_master_uri()
00163 name = 'proxy-proxy'
00164
00165
00166 protocol_handlers = [tcpros_handler]
00167 rpc_handler = rosproxy.handler.ProxyHandler(name, master_uri, self.topic_manager, protocol_handlers)
00168 self.external_node = external_node= XmlRpcNode(xmlrpc_port, rpc_handler)
00169
00170
00171 external_node.start()
00172 import time
00173 timeout_t = time.time() + 10.
00174 while time.time() < timeout_t and external_node.uri is None:
00175 time.sleep(0.01)
00176
00177 rospy.loginfo("XMLRPC interface is up %s"%self.external_node.uri)
00178
00179
00180 def _configure_internal_services(self, service_names):
00181 """
00182 Create rospy handles to all of the services that we are
00183 configured to be able to call.
00184 """
00185
00186 i = 0
00187 for name in service_names:
00188 rospy.loginfo("configuring service %s", name)
00189 resolved_name = rospy.resolve_name(name)
00190 rospy.wait_for_service(name, timeout=60.)
00191 type_ = rosservice.get_service_type(resolved_name)
00192 if type_ is None:
00193 raise rospy.ROSInitException("cannot proxy service [%s]: unknown type"%resolved_name)
00194
00195 i += 1
00196
00197
00198
00199
00200
00201 real_service_class = roslib.message.get_service_class(type_)
00202 real_req = real_service_class._request_class
00203 real_resp = real_service_class._response_class
00204 request_d = {
00205 '_type': real_req._type,
00206 '_md5sum': real_req._md5sum,
00207 '_full_text': real_req._full_text,
00208 }
00209 response_d = {
00210 '_type': real_resp._type,
00211 '_md5sum': real_resp._md5sum,
00212 '_full_text': real_resp._full_text,
00213 }
00214 service_class = classobj('s_passthrough_%s'%i, (object,), {
00215 '_type' : real_service_class._type,
00216 '_md5sum' : real_service_class._md5sum,
00217 '_request_class' : classobj('passthrough_request_%s'%i, (PassthroughServiceMessage, ), request_d),
00218 '_response_class' : classobj('passthrough_response_%s'%i, (PassthroughServiceMessage,), response_d),
00219 })
00220
00221 self.services[resolved_name] = rospy.ServiceProxy(name, service_class, persistent=True)
00222
00223 rospy.loginfo("proxying %s", resolved_name)
00224
00225 def _configure_internal_topics(self, pub_names):
00226 """
00227 Create subscriptions to all the topics that we externally publish.
00228 """
00229
00230 i = 0
00231 for name in pub_names:
00232 resolved_name = rospy.resolve_name(name)
00233 rospy.loginfo("configuring internal subscriber [%s]", resolved_name)
00234
00235 try:
00236 real_msg_class, _, _ = rostopic.get_topic_class(resolved_name)
00237 except rostopic.ROSTopicException:
00238 raise rospy.ROSInitException("cannot proxy subscription [%s]: unknown topic type"%resolved_name)
00239
00240 i += 1
00241 topic_class = classobj('t_passthrough_%s'%i, (rospy.msg.AnyMsg,), {
00242 '_type' : real_msg_class._type,
00243 '_md5sum' : real_msg_class._md5sum,
00244 })
00245 self.subs_internal[resolved_name] = rospy.Subscriber(name, topic_class)
00246
00247 rospy.loginfo("proxying %s", resolved_name)
00248
00249
00250 def configure_internal(self):
00251 """
00252 Bring up connections to internal ROS graph
00253 """
00254
00255 rospy.init_node('proxy')
00256
00257
00258
00259 service_names = rospy.get_param('~services', {})
00260 self._configure_internal_services(service_names)
00261
00262 pub_names = rospy.get_param('~pubs', {})
00263 self._configure_internal_topics(pub_names)
00264
00265
00266 def service_connection_handler(self, sock, client_addr, header):
00267 """
00268 @param sock: socket connection
00269 @type sock: socket
00270 @param client_addr: client address
00271 @type client_addr: (str, int)
00272 @param header: key/value pairs from handshake header
00273 @type header: dict
00274 @return: error string or None
00275 @rtype: str
00276 """
00277
00278
00279
00280
00281
00282
00283
00284 for required in ['service', 'md5sum', 'callerid']:
00285 if not required in header:
00286 return "Missing required '%s' field"%required
00287 else:
00288
00289 service_name = header['service']
00290
00291 sm = self.service_manager
00292 md5sum = header['md5sum']
00293 service = sm.get_service(service_name)
00294 if not service:
00295 return "[%s] is not a provider of [%s]"%(rospy.names.get_caller_id(), service_name)
00296 elif md5sum != rospy.names.SERVICE_ANYTYPE and md5sum != service.service_class._md5sum:
00297 return "request from [%s]: md5sums do not match: [%s] vs. [%s]"%(header['callerid'], md5sum, service.service_class._md5sum)
00298 else:
00299 transport = TCPROSTransport(service.protocol, service_name, header=header)
00300 transport.set_socket(sock, header['callerid'])
00301 transport.write_header()
00302 thread.start_new_thread(service.handle, (transport, header))
00303
00304 def rosproxy_main():
00305
00306 p = Proxy()
00307 p.connect()
00308
00309
00310 rospy.spin()